23. MongoDB Aggregation Part 2. - MapReduce
앞의 포스팅에서 설명했던 집합(aggregation) 연산자인 count
, distinct
, group
으로 할 수 있는 모든 것 뿐만 아니라 더 많은 일들을 맵리듀스를 통해 할 수 있습니다.
특히 다중의 서버를 통해 집합 연산자를 쉽게 병렬로 처리할 수 있습니다.
맵리듀스는 문제를 여러 개의 덩어리로 분할하고, 각 덩어리를 다양한 머쉰으로 전송하고, 각 머쉰이 문제의 각 부분을 해결하도록 합니다.
모든 머쉰에서 처리가 모두 마무리되면 솔루션 결과를 모두 모아서 전체적인 솔루션으로 합칩니다.
맵리듀스는 다음과 같은 절차로 처리됩니다:
- 첫번째 단계는 "맵(Map)"이며, 이는 연산을 컬렉션 내의 각 도큐먼트로 매핑하는 것입니다. 이 연산은 "아무 것도 하지 않는 것" 또는 "X값을 갖는 key들을 전송"하는 것입니다.
- 중간 단계는 "셔플(Shuffle)"이라 불리며, key들이 그룹화 되고 전송된 값의 리스트들이 각 key에 대해 생성되는 단계입니다. 리듀스는 이러한 값의 리스트를 취하고 이를 단일 엘리먼트로 줄입니다. 이 엘리먼트는 각 key가 단일 값을 포함하는 리스트를 가질 때까지 셔플 단계를 반복합니다.
다음 그림을 통해 맵리듀의 대략적인 개요를 이해하도록 하겠습니다.
orders
컬렉션은 cust_id
, amount
, status
3개의 key를 가지고 있으며, 그림과 같이 4개의 도큐먼트를 가지고 있다고 할 때, 처리 플로우를 살펴보겠습니다.
우선 query: {status: "A"}
에 의해 status
의 key 값이 "A"인 도큐먼트를 찾습니다.(query)
그런 다음, Map Function인 function() {emit( this.cust_id, this.amount); }
에 의해 동일한 cust_id
값을 갖는 도큐먼트끼리 그룹화 하되 이들이 가지고 있는 amount
key 값과 연관 짓습니다.
Reduce Function을 살펴보면 2개의 파라미터를 취하는데 첫번째 파라미터인 "key"는 cust_id
이며, 두번째 파라미터인 "values"는 amount
입니다. Return 값으로 values(amount)의 합을 Array 형으로 계산합니다. 최종 결과값은 order_totals
이며, 각 cust_id
로 그룹화 된 도큐먼트의 amount
를 합한 값입니다.
맵리듀스를 사용에 있어 손해보는 부분은 속도입니다: group
도 그리 빠르지는 않지만, 맵리듀스는 이보다 느리며 실시간 처리로 사용하기에는 무리가 있습니다. 백그라운드로 맵리듀스를 실행하고, 결과 컬렉션을 생성한 후 실시간으로 이 컬렉션에 대한 쿼리를 실행할 수 있습니다.
맵리듀스 명령은 다음과 같은 형태를 갖습니다:
db.runCommand(
{
mapReduce: <collection>,
map: <function>,
reduce: <function>,
out: <output>,
query: <document>,
sort: <document>,
limit: <number>,
finalize: <function>,
scope: <document>,
jsMode: <boolean>,
verbose: <boolean>
}
)
주요 필드값에 대해 요약하면 다음과 같습니다:
필드 | 형태 | 설명 |
---|---|---|
mapReduce | collection | 맵리듀스를 실행할 컬렉션 이름. |
map | JavaScript Function | value와 key를 맵핑하고 key와 values 쌍을 전송(emit)하는 JavaScript 함수. |
reduce | JavaScript Function | 하나의 오브젝트를 특정 key와 연관된 values로 "줄이는(reduce)"는 JavaScript 함수. |
out | string | 출력 컬렉션의 이름. |
query | document | 도큐먼트를 map 함수로 전송하기 전 도큐먼트를 선택하는 기준. |
sort | document | 도큐먼트를 map 함수로 전송하기 전 도큐먼트를 분류. (limit 옵션과 함께 사용하면 유용함.) |
limit | number | map 함수로 전송할 도큐먼트의 최대 개수. |
finalize | JavaScript Function | reduce의 결과로 보내는 최종 단계. |
scope | document | JavaScript 코드에서 사용되는 변수. |
jsMode | Boolean | map 함수와 reduce 함수 실행 사이에 중간 데이터를 BSON 포맷을 변환할 지 여부 결정. * false인 경우: map 함수가 전송한 JavaScript 오브젝트를 BSON 포맷으로 변환함. 맵리듀스 연산은 중간 BSON 오브젝트를 임시 저장소에 저장. * true인 경우: map 함수가 전송한 JavaScript 오브젝트는 원래의 형태를 유지 |
verbose | Boolean | 서버 로그에서 더 많은 verbose 결과를 사용할지 여부 결정. |
다음 예제를 통해 맵리듀스에 대해 더 자세하게 이해하도록 합니다 (예제는 MongoDB의 공식사이트의 맵리듀스 도큐먼트를 최대한 참고하여 약간의 재구성을 하였습니다).
우선 orders
라는 이름으로 다음과 같은 형태의 컬렉션을 준비합니다:
{
_id: ObjectId("50a8240b927d5d8b5891743c"),
cust_id: "abc123",
ord_date: new Date("Oct 04, 2012"),
status: 'A',
price: 25,
items: [ { sku: "mmm", qty: 5, price: 2.5 },
{ sku: "nnn", qty: 5, price: 2.5 } ]
}
그러나, 데이터를 위와 같은 형태로 하나씩 만드는 것은 꽤나 번거로운 일이므로 독자의 편의를 위해 다음의 내용을 MongoDB 쉘에 Copy + Paste합니다:
> db.orders.insert({cust_id: "abc123", ord_date: new Date("Oct 04, 2012"), status: 'A', price: 25, items: [ { sku: "mmm", qty: 5, price: 2.5 }, { sku: "nnn", qty: 4, price: 2.5 } ]})
> db.orders.insert({cust_id: "abc123", ord_date: new Date("Nov 03, 2011"), status: 'B', price: 40, items: [ { sku: "mmm", qty: 7, price: 4.5 }, { sku: "nnn", qty: 3, price: 3.5 } ]})
> db.orders.insert({cust_id: "def123", ord_date: new Date("Feb 03, 2014"), status: 'C', price: 15, items: [ { sku: "mmm", qty: 5, price: 4.0 }, { sku: "nnn", qty: 8, price: 3.0 } ]})
> db.orders.insert({cust_id: "def123", ord_date: new Date("Mar 03, 2014"), status: 'D', price: 17, items: [ { sku: "mmm", qty: 6, price: 2.0 }, { sku: "nnn", qty: 4, price: 1.7 } ]})
> db.orders.find().pretty()
{
"_id" : ObjectId("531213442c4a45d0bbc868f4"),
"cust_id" : "abc123",
"ord_date" : ISODate("2012-10-03T15:00:00Z"),
"status" : "A",
"price" : 25,
"items" : [
{
"sku" : "mmm",
"qty" : 5,
"price" : 2.5
},
{
"sku" : "nnn",
"qty" : 4,
"price" : 2.5
}
]
}
{
"_id" : ObjectId("531213452c4a45d0bbc868f5"),
"cust_id" : "abc123",
"ord_date" : ISODate("2011-11-02T15:00:00Z"),
"status" : "B",
"price" : 40,
"items" : [
{
"sku" : "mmm",
"qty" : 7,
"price" : 4.5
},
{
"sku" : "nnn",
"qty" : 3,
"price" : 3.5
}
]
}
{
"_id" : ObjectId("531213462c4a45d0bbc868f6"),
"cust_id" : "def123",
"ord_date" : ISODate("2014-02-02T15:00:00Z"),
"status" : "C",
"price" : 15,
"items" : [
{
"sku" : "mmm",
"qty" : 5,
"price" : 4
},
{
"sku" : "nnn",
"qty" : 8,
"price" : 3
}
]
}
{
"_id" : ObjectId("531213472c4a45d0bbc868f7"),
"cust_id" : "def123",
"ord_date" : ISODate("2014-03-02T15:00:00Z"),
"status" : "D",
"price" : 17,
"items" : [
{
"sku" : "mmm",
"qty" : 6,
"price" : 2
},
{
"sku" : "nnn",
"qty" : 4,
"price" : 1.7
}
]
}
[예제 1 : 고객 ID별 전체 구매가 계산]
상기 준비한 4개의 도큐먼트들에는 고객의 이름 대신 고객의 아이디(cust_id
)로 고객 관리를 한다고 가정하였습니다 (고객의 이름이 동일할 수도 있으니 ID로 관리하는 것은 당연합니다).
첫번째 맵리듀스 예제는 각 고객이 주문한 제품의 총 구매가를 계산하는 것입니다.
준비된 도큐먼트들은 총 두명의 고객(abc123
, def123
)이 각각 두 개의 제품을 주문하였습니다.
각 고객별로 주문한 제품의 합계를 계산해 보도록 하겠습니다.
-
각 입력된 도큐먼트를 처리할 "map" 함수를 정의합니다:
-
함수에서
this
는 맵리듀스 함수가 처리하고 있는 도큐먼트를 참조합니다. - 함수는 각각의 도큐먼트에 대해
price
를cust_id
로 맵핑한 후,cust_id
와price
짝을 방출("emit")합니다.
var mapFunction1 = function() {
emit(this.cust_id, this.price);
};
-
keyCustID
와valuesPrices
, 두 개의 인자를 취하는 "reduce" 함수를 정의합니다: -
valuesPrices
는price
값으로 구성된 배열(array)이며, "map" 함수가 방출하고 이를keyCustId
로 그룹화 한 값입니다. - 함수는
valuesPrices
배열의 요소 합을 계산합니다.
var reduceFunction1 = function(keyCustId, valuesPrices) {
return Array.sum(valuesPrices);
};
- 1번과 2번 과정에서 정의된 "map" 함수(mapFunction1)과 "reduce" 함수(reduceFunction1)을 이용하여
orders
컬렉션 내의 모든 도큐먼트들에 대해 맵-리듀스를 실행합니다:
db.orders.mapReduce(
mapFunction1,
reduceFunction1,
{ out: "map_reduce_example" }
)
실행 결과는 out인 map_reduce_example
컬렉션으로 저장됩니다.
3번까지 실행한 후 getCollectionNames()
메써드를 실행하여 현재 DB의 컬렉션 리스트를 출력하면 map_reduce_example
컬렉션이 생성되었음을 확인할 수 있을 것입니다:
> db.getCollectionNames()
[ "map_reduce_example", "orders", "system.indexes", "system.users" ]
이제 map_reduce_example
컬렉션 내의 도큐먼트 내용을 살펴보겠습니다:
> db.map_reduce_example.find().pretty()
{ "_id" : "abc123", "value" : 65 }
{ "_id" : "def123", "value" : 32 }
예상대로 cust_id: abc123
의 주문 아이템의 price
합이 65, cust_id: def123
에 대해서는 32의 계산 결과를 확인할 수 있습니다.
[예제 2 : 각 아이템에 대한 평균, 주문 및 전체 수량 계산하기]
두번째 예제에서는 orders
컬렉션의 주문일자(ord_date
)가 01/01/2012 이후인 모든 도큐먼트에 대해 맵리듀스를 실행하도록 하겠습니다.
item.sku
필드로 그룹화하고, 각 sku
에 대해 주문횟수와 총 주문수량을 계산합니다.
각 sku
값에 대해 주문 당 평균 수량을 계산하는 것으로 마무리합니다.
-
각 입력된 도큐먼트를 처리할 "map" 함수를 정의합니다:
-
함수에서
this
는 map-reduce 함수가 처리하고 있는 도큐먼트를 참조합니다. - 각 아이템에 대해, 함수는
sku
와 새로운 오브젝트인value
와 연동합니다.value
는count
값 1과 주문에 대한 아이템 수량인qty
를 가지며,sku
와value
쌍을 방출합니다.
var mapFunction2 = function() {
for (var idx = 0; idx < this.items.length; idx++) {
var key = this.items[idx].sku;
var value = {
count: 1,
qty: this.items[idx].qty
};
emit(key, value);
}
};
-
keySKU
와countObjVals
, 두 개의 인자를 취하는 "reduce" 함수 정의합니다: -
countObjVals
는 배열(array) 값이며, 이 배열의 요소들은 "map" 함수에서 "reduce" 함수로 전달되는keySKU
값으로 그룹화 되는 오브젝트들입니다. - 함수는
countObjVals
배열을count
와qty
필드를 포함하는reducedValue
라는 단일 오브젝트로 줄입니다. reducedVal
에서count
필드는 각각의 배열 요소로부터count
필드들의 합(결국 전체 주문횟수를 의미)을 포함하며,qty
필드는 역시 각각의 배열 요소로부터qty
필드들의 합(결국 전체 주문수량을 의미)을 포함합니다.
var reduceFunction2 = function(keySKU, countObjVals) {
var reducedVal = { count: 0, qty: 0 };
for (var idx = 0; idx < countObjVals.length; idx++) {
reducedVal.count += countObjVals[idx].count;
reducedVal.qty += countObjVals[idx].qty;
}
return reducedVal;
};
key
와reduceVal
의 두 개의 인자를 취하는finalize
함수를 정의한다. 이 함수는reduceVal
오브젝트를 수정하여avg
라는 계산된 필드를 추가하고 이렇게 수정된 오브젝트를 반환합니다:
var finalizeFunction2 = function (key, reducedVal) {
reducedVal.avg = reducedVal.qty/reducedVal.count;
return reducedVal;
};
mapFunction2
,reduceFunction2
,finalizeFunction2
함수를 이용하여order
컬렉션에 대해 map-reduce를 연산을 수행합니다:
db.orders.mapReduce(
mapFunction2,
reduceFunction2,
{
out: { merge: "map_reduce_example" },
query: { ord_date: { $gt: new Date('01/01/2012') }},
finalize: finalizeFunction2
}
)
예제 1과 마찬가지로 결과로 저장할 컬렉션(out)은 map_reduce_example
로 동일한데, 예제 1을 수행한 후 map_reduce_example
이 이미 존재하므로 이것에 예제 2결과를 추가하도록 merge
옵션을 사용하였습니다.
out
옵션에는 replace
, merge
, reduce
등 세 가지 옵션이 있습니다. place
는 새로운 내용으로 대체, merge
는 새로운 내용 추가, reduce
는 merge
와 기능이 전반적으로 유사하지만 기존의 도큐먼트와 새 도큐먼트 모두에 reduce
함수를 적용하는 기능이 추가적으로 존재합니다.
그 외 out
옵션을 잘 활용하면, 현재의 DB가 아닌 다른 이름의 DB로도 결과를 저장할 수 있으며, "sharding"인 경우에도 맵리듀스 결과를 보낼 수 있습니다.
query
에는 주문일자(ord_date)가 2012년1월1일(01/01/2012, 날짜 정의 순서는 월/일/년, 즉 MM/DD/YYYY) 이후의 아이템에 대해서만 처리하는 쿼리를 요청하였습니다.
이렇게 1- 4번을 실행한 결과는 다음과 같습니다.
우선 getCollectionNames()
명령으로 현재 DB의 컬렉션 리스트를 살펴 보겠습니다:
> db.getCollectionNames()
[ "map_reduce_example", "orders", "system.indexes", "system.users" ]
반드시 map_reduce_example
이 존재해야 합니다. 그 다음, map_reduce_example
컬렉션의 내용을 살펴보겠습니다:
> db.map_reduce_example.find().pretty()
{ "_id" : "abc123", "value" : 65 }
{ "_id" : "def123", "value" : 32 }
{
"_id" : "mmm",
"value" : {
"count" : 3,
"qty" : 16,
"avg" : 5.333333333333333
}
}
{
"_id" : "nnn",
"value" : {
"count" : 3,
"qty" : 16,
"avg" : 5.333333333333333
}
}
앞서 준비한 도큐먼트 아이템에서 2012년1월1일 이후의 아이템에 대해 items > sku: mmm
과 items > sku: nnn
에 대해 주문횟수(count
), 주문수량(qty
), 주문수량 평균(avg
)를 직접 계산하여 결과와 비교해 보길 바라겠습니다.