일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Deep learning
- 통계
- 몽고디비
- 빅 데이터
- Big Data
- nodeJS
- No SQL
- openCV
- 빅 데이타
- 주일설교
- 빅데이타
- Statistics
- 우리들교회
- 김양재 목사님
- 확률
- 김양재 목사
- 인공지능
- node.js
- WebGL
- MongoDB
- Machine Learning
- 빅데이터
- Artificial Intelligence
- 딥러닝
- c++
- probability
- R
- data science
- 데이터 과학
- 김양재
- Today
- Total
Scientific Computing & Data Science
[MongoDB] Aggregation / Map Reduce 본문
by Geol Choi |
앞의 글에서 설명했던 집합(aggregation) 연산자인 count, distinct, group으로 할 수 있는 모든 것 뿐만 아니라 더 많은 일들을 맵리듀스를 통해 할 수 있습니다. 특히 다중의 서버를 통해 집합 연산자를 쉽게 병렬로 처리할 수 있습니다. 맵리듀스는 문제를 여러 개의 덩어리로 분할하고, 각 덩어리를 다양한 머쉰으로 전송하고, 각 머쉰이 문제의 각 부분을 해결하도록 합니다. 모든 머쉰에서 처리가 모두 마무리되면 솔루션 결과를 모두 모아서 전체적인 솔루션으로 합칩니다.
맵리듀스는 다음과 같은 절차로 처리됩니다.
(1) 첫번째 단계는 "맵(Map)"이며, 이는 연산을 컬렉션 내의 각 도큐먼트로 매핑하는 것입니다. 이 연산은 "아무 것도 하지 않는 것" 또는 " X값을 갖는 key들을 전송"하는 것입니다.
(2) 중간 단계는 "셔플(Shuffle)"이라 불리며, key들이 그룹화 되고 전송된 값의 리스트들이 각 key에 대해 생성되는 단계입니다. 리듀스는 이러한 값의 리스트를 취하고 이를 단일 엘리먼트로 줄입니다. 이 엘리먼트는 각 key가 단일 값을 포함하는 리스트를 가질 때까지 셔플 단계를 반복합니다.
다음 그림을 통해 맵리듀의 대략적인 개요를 이해하도록 하겠습니다.
"orders" 컬렉션은 "cust_id", "amount", "status" 3개의 key를 가지고 있으며, 그림과 같이 4개의 도큐먼트를 가지고 있다고 할 때, 처리 플로우를 살펴보겠습니다.
우선 query: {status: "A"}에 의해 "status"의 key 값이 "A"인 도큐먼트를 찾습니다.(query)
그런 다음, Map Function인 function() {emit( this.cust_id, this.amount); } 에 의해 동일한 "cust_id" 값을 갖는 도큐먼트끼리 그룹화 하되 이들이 가지고 있는 "amount" key 값과 연관짓습니다.
Reduce Function을 살펴보면 2개의 파라미터를 취하는데 첫번째 파라미터인 "key"는 "cust_id"이며, 두번째 파라미터인 "values"는 "amount"입니다. Return 값으로 values(amount)의 합을 Array 형으로 계산합니다. 최종 결과값은 "order_totals"이며, 각 "cust_id"로 그룹화 된 도큐먼트의 "amount"를 합한 값입니다.
맵리듀스를 사용에 있어 손해보는 부분은 속도이다: "group"도 그리 빠르지는 않지만, 맵리듀스는 이보다 느리며 실시간 처리로 사용하기에는 무리가 있습니다. 백그라운드로 맵리듀스를 실행하고, 결과 컬렉션을 생성한 후 실시간으로 이 컬렉션에 대한 쿼리를 실행할 수 있습니다.
맵리듀스 명령은 다음과 같은 형태를 갖습니다:
db.runCommand(
{
mapReduce: <collection>,
map: <function>,
reduce: <function>,
out: <output>,
query: <document>,
sort: <document>,
limit: <number>,
finalize: <function>,
scope: <document>,
jsMode: <boolean>,
verbose: <boolean>
}
)
주요 필드값에 대해 요약하면 다음과 같습니다:
필드 | 형태 | 설명 |
mapReduce | collection | 맵리듀스를 실행할 컬렉션 이름. |
map | JavaScript | value와 key를 맵핑하고 key와 values 쌍을 전송(emit)하는 JavaScript 함수. |
reduce | JavaScript | 하나의 오브젝트를 특정 key와 연관된 values로 "줄이는(reduce)"는 JavaScript 함수. |
out | string | 출력 컬렉션의 이름. |
query | document | 도큐먼트를 map 함수로 전송하기 전 도큐먼트를 선택하는 기준. |
sort | document | 도큐먼트를 map 함수로 전송하기 전 도큐먼트를 분류. (limit 옵션과 함께 사용하면 유용함.) |
limit | number | map 함수로 전송할 도큐먼트의 최대 개수. |
finalize | JavaScript | reduce의 결과로 보내는 최종 단계. |
scope | document | JavaScript 코드에서 사용되는 변수. |
jsMode | Boolean | map 함수와 reduce 함수 실행 사이에 중간 데이터를 BSON 포맷을 변환할 지 여부 결정. false인 경우:
true인 경우:
|
verbose | Boolean | 서버 로그에서 더 많은 verbose 결과를 사용할지 여부 결정. |
다음 예제를 통해 맵리듀스에 대해 더 자세하게 이해하도록 합니다 (예제는 MongoDB의 공식사이트의 맵리듀스 도큐먼트를 최대한 참고하여 약간의 재구성을 하였습니다).
우선 "orders" 라는 이름으로 다음과 같은 형태의 컬렉션을 준비합니다:
{
_id: ObjectId("50a8240b927d5d8b5891743c"),
cust_id: "abc123",
ord_date: new Date("Oct 04, 2012"),
status: 'A',
price: 25,
items: [ { sku: "mmm", qty: 5, price: 2.5 },
{ sku: "nnn", qty: 5, price: 2.5 } ]
}
그러나, 데이터를 위와 같은 형태로 하나씩 만드는 것은 꽤나 번거로운 일이므로 독자의 편의를 위해 다음의 내용을 MongoDB 쉘에 Copy + Paste합니다:
> db.orders.insert({cust_id: "abc123", ord_date: new Date("Oct 04, 2012"), status: 'A', price: 25, items: [ { sku: "mmm", qty: 5, price: 2.5 }, { sku: "nnn", qty: 4, price: 2.5 } ]})
> db.orders.insert({cust_id: "abc123", ord_date: new Date("Nov 03, 2011"), status: 'B', price: 40, items: [ { sku: "mmm", qty: 7, price: 4.5 }, { sku: "nnn", qty: 3, price: 3.5 } ]})
> db.orders.insert({cust_id: "def123", ord_date: new Date("Feb 03, 2014"), status: 'C', price: 15, items: [ { sku: "mmm", qty: 5, price: 4.0 }, { sku: "nnn", qty: 8, price: 3.0 } ]})
> db.orders.insert({cust_id: "def123", ord_date: new Date("Mar 03, 2014"), status: 'D', price: 17, items: [ { sku: "mmm", qty: 6, price: 2.0 }, { sku: "nnn", qty: 4, price: 1.7 } ]})
> db.orders.find().pretty()
{
"_id" : ObjectId("531213442c4a45d0bbc868f4"),
"cust_id" : "abc123",
"ord_date" : ISODate("2012-10-03T15:00:00Z"),
"status" : "A",
"price" : 25,
"items" : [
{
"sku" : "mmm",
"qty" : 5,
"price" : 2.5
},
{
"sku" : "nnn",
"qty" : 4,
"price" : 2.5
}
]
}
{
"_id" : ObjectId("531213452c4a45d0bbc868f5"),
"cust_id" : "abc123",
"ord_date" : ISODate("2011-11-02T15:00:00Z"),
"status" : "B",
"price" : 40,
"items" : [
{
"sku" : "mmm",
"qty" : 7,
"price" : 4.5
},
{
"sku" : "nnn",
"qty" : 3,
"price" : 3.5
}
]
}
{
"_id" : ObjectId("531213462c4a45d0bbc868f6"),
"cust_id" : "def123",
"ord_date" : ISODate("2014-02-02T15:00:00Z"),
"status" : "C",
"price" : 15,
"items" : [
{
"sku" : "mmm",
"qty" : 5,
"price" : 4
},
{
"sku" : "nnn",
"qty" : 8,
"price" : 3
}
]
}
{
"_id" : ObjectId("531213472c4a45d0bbc868f7"),
"cust_id" : "def123",
"ord_date" : ISODate("2014-03-02T15:00:00Z"),
"status" : "D",
"price" : 17,
"items" : [
{
"sku" : "mmm",
"qty" : 6,
"price" : 2
},
{
"sku" : "nnn",
"qty" : 4,
"price" : 1.7
}
]
}
[예제 1 : 고객 ID별 전체 구매가 계산]
상기 준비한 4개의 도큐먼트들에는 고객의 이름 대신 고객의 아이디(cust_id)로 고객 관리를 한다고 가정하였습니다 (고객의 이름이 동일할 수도 있으니 ID로 관리하는 것은 당연합니다). 첫번째 맵리듀스 예제는 각 고객이 주문한 제품의 총 구매가를 계산하는 것입니다. 준비된 도큐먼트들은 총 두명의 고객("abc123", "def123")이 각각 두 개의 제품을 주문하였습니다. 각 고객별로 주문한 제품의 합계를 계산해 보도록 하겠습니다.
1. 각 입력된 도큐먼트를 처리할 "map" 함수를 정의합니다:
- 함수에서 "this"는 맵리듀스 함수가 처리하고 있는 도큐먼트를 참조합니다.
- 함수는 각각의 도큐먼트에 대해 "price"를 "cust_id"로 맵핑한 후, "cust_id"와 "price" 짝을 방출("emit")합니다.
var mapFunction1 = function() {
emit(this.cust_id, this.price);
};
2. "keyCustID"와 "valuesPrices", 두 개의 인자를 취하는 "reduce" 함수를 정의합니다:
- "valuesPrices"는 "price" 값으로 구성된 배열(array)이며, "map" 함수가 방출하고 이를 "keyCustId"로 그룹화 한 값입니다.
- 함수는 " valuesPrices" 배열의 요소 합을 계산합니다.
var reduceFunction1 = function(keyCustId, valuesPrices) {
return Array.sum(valuesPrices);
};
3. 1번과 2번 과정에서 정의된 "map" 함수(mapFunction1)과 "reduce" 함수(reduceFunction1)을 이용하여 "orders" 컬렉션 내의 모든 도큐먼트들에 대해 맵-리듀스를 실행합니다:
db.orders.mapReduce(
mapFunction1,
reduceFunction1,
{ out: "map_reduce_example" }
)
실행 결과는 out인 "map_reduce_example" 컬렉션으로 저장됩니다.
3번까지 실행한 후 "getCollectionNames()" 메써드를 실행하여 현재 DB의 컬렉션 리스트를 출력하면 "map_reduce_example" 컬렉션이 생성되었음을 확인할 수 있을 것입니다:
> db.getCollectionNames()
[ "map_reduce_example", "orders", "system.indexes", "system.users" ]
이제 "map_reduce_example" 컬렉션 내의 도큐먼트 내용을 살펴보겠습니다:
> db.map_reduce_example.find().pretty()
{ "_id" : "abc123", "value" : 65 }
{ "_id" : "def123", "value" : 32 }
예상대로 "cust_id: abc123"의 주문 아이템의 "price" 합이 65, "cust_id: def123"에 대해서는 32의 계산 결과를 확인할 수 있습니다.
[예제 2 : 각 아이템에 대한 평균, 주문 및 전체 수량 계산하기]
두번째 예제에서는 "orders" 컬렉션의 주문일자(ord_date)가 01/01/2012 이후인 모든 도큐먼트에 대해 맵리듀스를 실행하도록 하겠습니다. "item.sku" 필드로 그룹화하고, 각 "sku"에 대해 주문횟수와 총 주문수량을 계산합니다. 각 "sku" 값에 대해 주문 당 평균 수량을 계산하는 것으로 마무리합니다.
1. 각 입력된 도큐먼트를 처리할 "map" 함수를 정의합니다:
- 함수에서 "this"는 map-reduce 함수가 처리하고 있는 도큐먼트를 참조합니다.
- 각 아이템에 대해, 함수는 "sku"와 새로운 오브젝트인 "value"와 연동한다. "value"는 "count" 값 1과 주문에 대한 아이템 수량인 "qty"를 가지며, "sku"와 "value" 쌍을 방출합니다.
var mapFunction2 = function() {
for (var idx = 0; idx < this.items.length; idx++) {
var key = this.items[idx].sku;
var value = {
count: 1,
qty: this.items[idx].qty
};
emit(key, value);
}
};
2. "keySKU"와 "countObjVals", 두 개의 인자를 취하는 "reduce" 함수 정의합니다:
- "countObjVals"는 배열(array) 값이며, 이 배열의 요소들은 "map" 함수에서 "reduce" 함수로 전달되는 "keySKU" 값으로 그룹화 되는 오브젝트들입니다.
- 함수는 "countObjVals" 배열을 "count"와 "qty" 필드를 포함하는 "reducedValue"라는 단일 오브젝트로 줄입니다.
- "reducedVal"에서 "count" 필드는 각각의 배열 요소로부터 "count" 필드들의 합(결국 전체 주문횟수를 의미)을 포함하며, "qty" 필드는 역시 각각의 배열 요소로부터 "qty" 필드들의 합(결국 전체 주문수량을 의미)을 포함합니다.
var reduceFunction2 = function(keySKU, countObjVals) {
var reducedVal = { count: 0, qty: 0 };
for (var idx = 0; idx < countObjVals.length; idx++) {
reducedVal.count += countObjVals[idx].count;
reducedVal.qty += countObjVals[idx].qty;
}
return reducedVal;
};
3. "key"와 "reduceVal"의 두 개의 인자를 취하는 "finalize" 함수를 정의한다. 이 함수는 "reduceVal" 오브젝트를 수정하여 "avg"라는 계산된 필드를 추가하고 이렇게 수정된 오브젝트를 반환합니다:
var finalizeFunction2 = function (key, reducedVal) {
reducedVal.avg = reducedVal.qty/reducedVal.count;
return reducedVal;
};
4. "mapFunction2", "reduceFunction2", "finalizeFunction2" 함수를 이용하여 "order" 컬렉션에 대해 map-reduce를 연산을 수행합니다:
db.orders.mapReduce(
mapFunction2,
reduceFunction2,
{
out: { merge: "map_reduce_example" },
query: { ord_date: { $gt: new Date('01/01/2012') }},
finalize: finalizeFunction2
}
)
예제 1과 마찬가지로 결과로 저장할 컬렉션(out)은 "map_reduce_example"로 동일한데, 예제 1을 수행한 후 "map_reduce_example"이 이미 존재하므로 이것에 예제 2결과를 추가하도록 "merge" 옵션을 사용하였습니다. "out" 옵션에는 "replace", "merge", "reduce" 등 세 가지 옵션이 있습니다. "place"는 새로운 내용으로 대체, "merge"는 새로운 내용 추가, "reduce"는 "merge"와 기능이 전반적으로 유사하지만 기존의 도큐먼트와 새 도큐먼트 모두에 "reduce" 함수를 적용하는 기능이 추가적으로 존재합니다.
그 외 "out" 옵션을 잘 활용하면, 현재의 DB가 아닌 다른 이름의 DB로도 결과를 저장할 수 있으며, "sharding"인 경우에도 맵리듀스 결과를 보낼 수 있습니다.
"query"에는 주문일자(ord_date)가 2012년1월1일(01/01/2012, 날짜 정의 순서는 월/일/년, 즉 MM/DD/YYYY) 이후의 아이템에 대해서만 처리하는 쿼리를 요청하였습니다.
이렇게 1- 4번을 실행한 결과는 다음과 같습니다.
우선 "getCollectionNames()" 명령으로 현재 DB의 컬렉션 리스트를 살펴 보겠습니다:
> db.getCollectionNames()
[ "map_reduce_example", "orders", "system.indexes", "system.users" ]
반드시 "map_reduce_example"이 존재해야 합니다. 그 다음, "map_reduce_example" 컬렉션의 내용을 살펴보겠습니다:
> db.map_reduce_example.find().pretty()
{ "_id" : "abc123", "value" : 65 }
{ "_id" : "def123", "value" : 32 }
{
"_id" : "mmm",
"value" : {
"count" : 3,
"qty" : 16,
"avg" : 5.333333333333333
}
}
{
"_id" : "nnn",
"value" : {
"count" : 3,
"qty" : 16,
"avg" : 5.333333333333333
}
}
위와 같이 예제 1 결과 다음에 예제 2 결과 내용이 추가되었음을 확인할 수 있습니다. 앞서 준비한 도큐먼트 아이템에서 2012년1월1일 이후의 아이템에 대해 "items > sku: mmm"과 "items > sku: nnn"에 대해 주문횟수(count), 주문수량(qty), 주문수량 평균(avg)를 직접 계산하여 결과와 비교해 보길 바라겠습니다.
'Data Science > MongoDB' 카테고리의 다른 글
[MongoDB] Capped Collections (0) | 2014.03.06 |
---|---|
[MongoDB] Advanced Topics / DB Commands 정리 (0) | 2014.03.04 |
[MongoDB] Aggregation / The Basic (0) | 2014.02.16 |
[MongoDB] Query / $snapshot (0) | 2014.02.09 |
[MongoDB] Query / Other Query Operations (0) | 2014.02.08 |