Skip to content

23. MongoDB Aggregation Part 2. - MapReduce

앞의 포스팅에서 설명했던 집합(aggregation) 연산자인 count, distinct, group으로 할 수 있는 모든 것 뿐만 아니라 더 많은 일들을 맵리듀스를 통해 할 수 있습니다.

특히 다중의 서버를 통해 집합 연산자를 쉽게 병렬로 처리할 수 있습니다.

맵리듀스는 문제를 여러 개의 덩어리로 분할하고, 각 덩어리를 다양한 머쉰으로 전송하고, 각 머쉰이 문제의 각 부분을 해결하도록 합니다.

모든 머쉰에서 처리가 모두 마무리되면 솔루션 결과를 모두 모아서 전체적인 솔루션으로 합칩니다.

맵리듀스는 다음과 같은 절차로 처리됩니다:

  1. 첫번째 단계는 "맵(Map)"이며, 이는 연산을 컬렉션 내의 각 도큐먼트로 매핑하는 것입니다. 이 연산은 "아무 것도 하지 않는 것" 또는 "X값을 갖는 key들을 전송"하는 것입니다.
  2. 중간 단계는 "셔플(Shuffle)"이라 불리며, key들이 그룹화 되고 전송된 값의 리스트들이 각 key에 대해 생성되는 단계입니다. 리듀스는 이러한 값의 리스트를 취하고 이를 단일 엘리먼트로 줄입니다. 이 엘리먼트는 각 key가 단일 값을 포함하는 리스트를 가질 때까지 셔플 단계를 반복합니다.

다음 그림을 통해 맵리듀의 대략적인 개요를 이해하도록 하겠습니다.

orders 컬렉션은 cust_id, amount, status 3개의 key를 가지고 있으며, 그림과 같이 4개의 도큐먼트를 가지고 있다고 할 때, 처리 플로우를 살펴보겠습니다.

우선 query: {status: "A"}에 의해 status의 key 값이 "A"인 도큐먼트를 찾습니다.(query)

그런 다음, Map Function인 function() {emit( this.cust_id, this.amount); } 에 의해 동일한 cust_id 값을 갖는 도큐먼트끼리 그룹화 하되 이들이 가지고 있는 amount key 값과 연관 짓습니다.

Reduce Function을 살펴보면 2개의 파라미터를 취하는데 첫번째 파라미터인 "key"는 cust_id이며, 두번째 파라미터인 "values"는 amount입니다. Return 값으로 values(amount)의 합을 Array 형으로 계산합니다. 최종 결과값은 order_totals이며, 각 cust_id로 그룹화 된 도큐먼트의 amount를 합한 값입니다.

맵리듀스를 사용에 있어 손해보는 부분은 속도입니다: group도 그리 빠르지는 않지만, 맵리듀스는 이보다 느리며 실시간 처리로 사용하기에는 무리가 있습니다. 백그라운드로 맵리듀스를 실행하고, 결과 컬렉션을 생성한 후 실시간으로 이 컬렉션에 대한 쿼리를 실행할 수 있습니다.

맵리듀스 명령은 다음과 같은 형태를 갖습니다:

db.runCommand(
   {
     mapReduce: <collection>,
     map: <function>,
     reduce: <function>,
     out: <output>,
     query: <document>,
     sort: <document>,
     limit: <number>,
     finalize: <function>,
     scope: <document>,
     jsMode: <boolean>,
     verbose: <boolean>
   }
 )

주요 필드값에 대해 요약하면 다음과 같습니다:

필드 형태 설명
mapReduce collection 맵리듀스를 실행할 컬렉션 이름.
map JavaScript Function value와 key를 맵핑하고 key와 values 쌍을 전송(emit)하는 JavaScript 함수.
reduce JavaScript Function 하나의 오브젝트를 특정 key와 연관된 values로 "줄이는(reduce)"는 JavaScript 함수.
out string 출력 컬렉션의 이름.
query document 도큐먼트를 map 함수로 전송하기 전 도큐먼트를 선택하는 기준.
sort document 도큐먼트를 map 함수로 전송하기 전 도큐먼트를 분류. (limit 옵션과 함께 사용하면 유용함.)
limit number map 함수로 전송할 도큐먼트의 최대 개수.
finalize JavaScript Function reduce의 결과로 보내는 최종 단계.
scope document JavaScript 코드에서 사용되는 변수.
jsMode Boolean map 함수와 reduce 함수 실행 사이에 중간 데이터를 BSON 포맷을 변환할 지 여부 결정. * false인 경우: map 함수가 전송한 JavaScript 오브젝트를 BSON 포맷으로 변환함. 맵리듀스 연산은 중간 BSON 오브젝트를 임시 저장소에 저장. * true인 경우: map 함수가 전송한 JavaScript 오브젝트는 원래의 형태를 유지
verbose Boolean 서버 로그에서 더 많은 verbose 결과를 사용할지 여부 결정.

다음 예제를 통해 맵리듀스에 대해 더 자세하게 이해하도록 합니다 (예제는 MongoDB의 공식사이트의 맵리듀스 도큐먼트를 최대한 참고하여 약간의 재구성을 하였습니다).

우선 orders 라는 이름으로 다음과 같은 형태의 컬렉션을 준비합니다:

{
     _id: ObjectId("50a8240b927d5d8b5891743c"),
     cust_id: "abc123",
     ord_date: new Date("Oct 04, 2012"),
     status: 'A',
     price: 25,
     items: [ { sku: "mmm", qty: 5, price: 2.5 },
              { sku: "nnn", qty: 5, price: 2.5 } ]
}

그러나, 데이터를 위와 같은 형태로 하나씩 만드는 것은 꽤나 번거로운 일이므로 독자의 편의를 위해 다음의 내용을 MongoDB 쉘에 Copy + Paste합니다:

> db.orders.insert({cust_id: "abc123", ord_date: new Date("Oct 04, 2012"), status: 'A', price: 25, items: [ { sku: "mmm", qty: 5, price: 2.5 }, { sku: "nnn", qty: 4, price: 2.5 } ]})
> db.orders.insert({cust_id: "abc123", ord_date: new Date("Nov 03, 2011"), status: 'B', price: 40, items: [ { sku: "mmm", qty: 7, price: 4.5 }, { sku: "nnn", qty: 3, price: 3.5 } ]})
> db.orders.insert({cust_id: "def123", ord_date: new Date("Feb 03, 2014"), status: 'C', price: 15, items: [ { sku: "mmm", qty: 5, price: 4.0 }, { sku: "nnn", qty: 8, price: 3.0 } ]})
> db.orders.insert({cust_id: "def123", ord_date: new Date("Mar 03, 2014"), status: 'D', price: 17, items: [ { sku: "mmm", qty: 6, price: 2.0 }, { sku: "nnn", qty: 4, price: 1.7 } ]})
> db.orders.find().pretty()
{
  "_id" : ObjectId("531213442c4a45d0bbc868f4"),
  "cust_id" : "abc123",
  "ord_date" : ISODate("2012-10-03T15:00:00Z"),
  "status" : "A",
  "price" : 25,
  "items" : [
    {
      "sku" : "mmm",
      "qty" : 5,
      "price" : 2.5
    },
    {
      "sku" : "nnn",
      "qty" : 4,
      "price" : 2.5
    }
  ]
}
{
  "_id" : ObjectId("531213452c4a45d0bbc868f5"),
  "cust_id" : "abc123",
  "ord_date" : ISODate("2011-11-02T15:00:00Z"),
  "status" : "B",
  "price" : 40,
  "items" : [
    {
      "sku" : "mmm",
      "qty" : 7,
      "price" : 4.5
    },
    {
      "sku" : "nnn",
      "qty" : 3,
      "price" : 3.5
    }
  ]
}
{
  "_id" : ObjectId("531213462c4a45d0bbc868f6"),
  "cust_id" : "def123",
  "ord_date" : ISODate("2014-02-02T15:00:00Z"),
  "status" : "C",
  "price" : 15,
  "items" : [
    {
      "sku" : "mmm",
      "qty" : 5,
      "price" : 4
    },
    {
      "sku" : "nnn",
      "qty" : 8,
      "price" : 3
    }
  ]
}
{
  "_id" : ObjectId("531213472c4a45d0bbc868f7"),
  "cust_id" : "def123",
  "ord_date" : ISODate("2014-03-02T15:00:00Z"),
  "status" : "D",
  "price" : 17,
  "items" : [
    {
      "sku" : "mmm",
      "qty" : 6,
      "price" : 2
    },
    {
      "sku" : "nnn",
      "qty" : 4,
      "price" : 1.7
    }
  ]
}

[예제 1 : 고객 ID별 전체 구매가 계산]

상기 준비한 4개의 도큐먼트들에는 고객의 이름 대신 고객의 아이디(cust_id)로 고객 관리를 한다고 가정하였습니다 (고객의 이름이 동일할 수도 있으니 ID로 관리하는 것은 당연합니다).

첫번째 맵리듀스 예제는 각 고객이 주문한 제품의 총 구매가를 계산하는 것입니다.

준비된 도큐먼트들은 총 두명의 고객(abc123, def123)이 각각 두 개의 제품을 주문하였습니다.

각 고객별로 주문한 제품의 합계를 계산해 보도록 하겠습니다.

  1. 각 입력된 도큐먼트를 처리할 "map" 함수를 정의합니다:

  2. 함수에서 this는 맵리듀스 함수가 처리하고 있는 도큐먼트를 참조합니다.

  3. 함수는 각각의 도큐먼트에 대해 pricecust_id로 맵핑한 후, cust_idprice 짝을 방출("emit")합니다.
var mapFunction1 = function() {
  emit(this.cust_id, this.price);
};
  1. keyCustIDvaluesPrices, 두 개의 인자를 취하는 "reduce" 함수를 정의합니다:

  2. valuesPricesprice 값으로 구성된 배열(array)이며, "map" 함수가 방출하고 이를 keyCustId로 그룹화 한 값입니다.

  3. 함수는 valuesPrices 배열의 요소 합을 계산합니다.
var reduceFunction1 = function(keyCustId, valuesPrices) {
  return Array.sum(valuesPrices);
};
  1. 1번과 2번 과정에서 정의된 "map" 함수(mapFunction1)과 "reduce" 함수(reduceFunction1)을 이용하여 orders 컬렉션 내의 모든 도큐먼트들에 대해 맵-리듀스를 실행합니다:
db.orders.mapReduce(
    mapFunction1,
    reduceFunction1,
    { out: "map_reduce_example" }
)

실행 결과는 out인 map_reduce_example 컬렉션으로 저장됩니다.

3번까지 실행한 후 getCollectionNames() 메써드를 실행하여 현재 DB의 컬렉션 리스트를 출력하면 map_reduce_example 컬렉션이 생성되었음을 확인할 수 있을 것입니다:

> db.getCollectionNames()
[ "map_reduce_example", "orders", "system.indexes", "system.users" ]

이제 map_reduce_example 컬렉션 내의 도큐먼트 내용을 살펴보겠습니다:

> db.map_reduce_example.find().pretty()
{ "_id" : "abc123", "value" : 65 }
{ "_id" : "def123", "value" : 32 }

예상대로 cust_id: abc123의 주문 아이템의 price 합이 65, cust_id: def123에 대해서는 32의 계산 결과를 확인할 수 있습니다.


[예제 2 : 각 아이템에 대한 평균, 주문 및 전체 수량 계산하기]

두번째 예제에서는 orders 컬렉션의 주문일자(ord_date)가 01/01/2012 이후인 모든 도큐먼트에 대해 맵리듀스를 실행하도록 하겠습니다.

item.sku 필드로 그룹화하고, 각 sku에 대해 주문횟수와 총 주문수량을 계산합니다.

sku 값에 대해 주문 당 평균 수량을 계산하는 것으로 마무리합니다.

  1. 각 입력된 도큐먼트를 처리할 "map" 함수를 정의합니다:

  2. 함수에서 this는 map-reduce 함수가 처리하고 있는 도큐먼트를 참조합니다.

  3. 각 아이템에 대해, 함수는 sku와 새로운 오브젝트인 value와 연동합니다. valuecount 값 1과 주문에 대한 아이템 수량인 qty를 가지며, skuvalue 쌍을 방출합니다.
var mapFunction2 = function() {
  for (var idx = 0; idx < this.items.length; idx++) {
    var key = this.items[idx].sku;
    var value = {
      count: 1,
      qty: this.items[idx].qty
    };

    emit(key, value);
  }
};
  1. keySKUcountObjVals, 두 개의 인자를 취하는 "reduce" 함수 정의합니다:

  2. countObjVals는 배열(array) 값이며, 이 배열의 요소들은 "map" 함수에서 "reduce" 함수로 전달되는 keySKU 값으로 그룹화 되는 오브젝트들입니다.

  3. 함수는 countObjVals 배열을 countqty 필드를 포함하는 reducedValue라는 단일 오브젝트로 줄입니다.
  4. reducedVal에서 count 필드는 각각의 배열 요소로부터 count 필드들의 합(결국 전체 주문횟수를 의미)을 포함하며, qty 필드는 역시 각각의 배열 요소로부터 qty 필드들의 합(결국 전체 주문수량을 의미)을 포함합니다.
var reduceFunction2 = function(keySKU, countObjVals) {
  var reducedVal = { count: 0, qty: 0 };

  for (var idx = 0; idx < countObjVals.length; idx++) {
    reducedVal.count += countObjVals[idx].count;
    reducedVal.qty += countObjVals[idx].qty;
  }

  return reducedVal;
};
  1. keyreduceVal의 두 개의 인자를 취하는 finalize 함수를 정의한다. 이 함수는 reduceVal 오브젝트를 수정하여 avg라는 계산된 필드를 추가하고 이렇게 수정된 오브젝트를 반환합니다:
var finalizeFunction2 = function (key, reducedVal) {
  reducedVal.avg = reducedVal.qty/reducedVal.count;

  return reducedVal;
};
  1. mapFunction2, reduceFunction2, finalizeFunction2 함수를 이용하여 order 컬렉션에 대해 map-reduce를 연산을 수행합니다:
db.orders.mapReduce(
  mapFunction2,
  reduceFunction2,
  {
    out: { merge: "map_reduce_example" },
    query: { ord_date: { $gt: new Date('01/01/2012') }},
    finalize: finalizeFunction2
  }
)

예제 1과 마찬가지로 결과로 저장할 컬렉션(out)은 map_reduce_example로 동일한데, 예제 1을 수행한 후 map_reduce_example이 이미 존재하므로 이것에 예제 2결과를 추가하도록 merge 옵션을 사용하였습니다.

out 옵션에는 replace, merge, reduce 등 세 가지 옵션이 있습니다. place는 새로운 내용으로 대체, merge는 새로운 내용 추가, reducemerge와 기능이 전반적으로 유사하지만 기존의 도큐먼트와 새 도큐먼트 모두에 reduce 함수를 적용하는 기능이 추가적으로 존재합니다.

그 외 out 옵션을 잘 활용하면, 현재의 DB가 아닌 다른 이름의 DB로도 결과를 저장할 수 있으며, "sharding"인 경우에도 맵리듀스 결과를 보낼 수 있습니다.

query에는 주문일자(ord_date)가 2012년1월1일(01/01/2012, 날짜 정의 순서는 월/일/년, 즉 MM/DD/YYYY) 이후의 아이템에 대해서만 처리하는 쿼리를 요청하였습니다.

이렇게 1- 4번을 실행한 결과는 다음과 같습니다.

우선 getCollectionNames() 명령으로 현재 DB의 컬렉션 리스트를 살펴 보겠습니다:

> db.getCollectionNames()
[ "map_reduce_example", "orders", "system.indexes", "system.users" ]

반드시 map_reduce_example이 존재해야 합니다. 그 다음, map_reduce_example 컬렉션의 내용을 살펴보겠습니다:

> db.map_reduce_example.find().pretty()
{ "_id" : "abc123", "value" : 65 }
{ "_id" : "def123", "value" : 32 }
{
  "_id" : "mmm",
  "value" : {
    "count" : 3,
    "qty" : 16,
    "avg" : 5.333333333333333
  }
}
{
  "_id" : "nnn",
  "value" : {
    "count" : 3,
    "qty" : 16,
    "avg" : 5.333333333333333
  }
}
위와 같이 예제 1 결과 다음에 예제 2 결과 내용이 추가되었음을 확인할 수 있습니다.

앞서 준비한 도큐먼트 아이템에서 2012년1월1일 이후의 아이템에 대해 items > sku: mmmitems > sku: nnn에 대해 주문횟수(count), 주문수량(qty), 주문수량 평균(avg)를 직접 계산하여 결과와 비교해 보길 바라겠습니다.