[MongoDB] Aggregation Pipeline

2020. 12. 18. 23:20Database/MongoDB

1. Aggregation Pipeline

1) Pipeline

파이프라인이란, 이전 단계의 연산결과를 다음 단계 연산에 이용하는 것을 의미한다.

2) Aggregation Framework

MongoDB의 Aggregation Framework는 데이터 처리 파이프라인의 개념을 모델로 한다. 문서는 여러 단계의 파이프라인을 거쳐 변화하고 하나의 문서의 형태로 집계할 수 있다.

Aggregation Pipeline

3) aggregate()

aggregate() 메서드는 파이프라인 단계를 배열의 형태로 나타낸다. Document는 파이프라인 배열의 순서대로 가공되며, $out 및 $geoNear를 제외한 모든 단계는 파이프라인에 여러번 나타날 수 있다.

4) $match

$match는 조건에 만족하는 Document만 필터링하는데 사용한다.

{ "_id" : ObjectId("512bc95fe835e68f199c8686"), "author" : "dave", "score" : 80, "views" : 100 }
{ "_id" : ObjectId("512bc962e835e68f199c8687"), "author" : "dave", "score" : 85, "views" : 521 }
{ "_id" : ObjectId("55f5a192d4bede9ac365b257"), "author" : "ahn", "score" : 60, "views" : 1000 }
{ "_id" : ObjectId("55f5a192d4bede9ac365b258"), "author" : "li", "score" : 55, "views" : 5000 }
{ "_id" : ObjectId("55f5a1d3d4bede9ac365b259"), "author" : "annT", "score" : 60, "views" : 50 }
{ "_id" : ObjectId("55f5a1d3d4bede9ac365b25a"), "author" : "li", "score" : 94, "views" : 999 }
{ "_id" : ObjectId("55f5a1d3d4bede9ac365b25b"), "author" : "ty", "score" : 95, "views" : 1000 }
db.articles.aggregate(
    [ { $match : { author : "dave" } } ]
);

연산 결과는 다음과 같다.

{ "_id" : ObjectId("512bc95fe835e68f199c8686"), "author" : "dave", "score" : 80, "views" : 100 }
{ "_id" : ObjectId("512bc962e835e68f199c8687"), "author" : "dave", "score" : 85, "views" : 521 }

 

4) $group

$group는 Document에 대한 그룹핑 연산을 수행한다. 그룹에 대한 id를 지정해야하고, 특정 필드에 대한 집계 연산이 가능하다. 단, 연산된 Document에 대한 정렬을 지원하지 않는다.

{ $group: { _id: <expression>, <field1>: { <accumulator1> : <expression1> }, ... } }

 

{ "_id" : 1, "item" : "abc", "price" : 10, "quantity" : 2, "date" : ISODate("2014-03-01T08:00:00Z") }
{ "_id" : 2, "item" : "jkl", "price" : 20, "quantity" : 1, "date" : ISODate("2014-03-01T09:00:00Z") }
{ "_id" : 3, "item" : "xyz", "price" : 5, "quantity" : 10, "date" : ISODate("2014-03-15T09:00:00Z") }
{ "_id" : 4, "item" : "xyz", "price" : 5, "quantity" : 20, "date" : ISODate("2014-04-04T11:21:39.736Z") }
{ "_id" : 5, "item" : "abc", "price" : 10, "quantity" : 10, "date" : ISODate("2014-04-04T21:23:13.331Z") }

년/월/일을 기준으로 집계하고, $price와 $quantity를 곱한 값의 합을 totalPrice 필드로 지정한다. 또한 $quantity 필드 값의 평균을 averageQuantity 필드로 지정하고 그룹별 데이터의 갯수를 count로 지정한다. 

db.sales.aggregate(
   [
      {
        $group : {
           _id : { month: { $month: "$date" }, day: { $dayOfMonth: "$date" }, year: { $year: "$date" } },
           totalPrice: { $sum: { $multiply: [ "$price", "$quantity" ] } },
           averageQuantity: { $avg: "$quantity" },
           count: { $sum: 1 }
        }
      }
   ]
)

연산 결과는 다음과 같다.

{ 
    "_id" : { "month" : 3, "day" : 15, "year" : 2014 }, 
    "totalPrice" : 50, 
    "averageQuantity" : 10, 
    "count" : 1 
}
{ 
    "_id" : { "month" : 4, "day" : 4, "year" : 2014 }, 
    "totalPrice" : 200, 
    "averageQuantity" : 15, 
    "count" : 2 
}
{ 
    "_id" : { "month" : 3, "day" : 1, "year" : 2014 }, 
    "totalPrice" : 40, 
    "averageQuantity" : 1.5, 
    "count" : 2 
}

위의 예시에서 사용된 $sum 외의 다른 Accumulator은 다음과 같다.

Accumulator 설명
$addToSet id로 그룹핑한 데이터를 중복되지 않은 Set의 형태로 저장
$avg 숫자 값의 평균을 반환한다. 숫자가 아닌 값은 무시한다.
$first 각 그룹에 대한 첫 번째 Document의 값을 반환한다.
$last 각 그룹에 대한 마지막 Document의 값을 반환한다.
$max 각 그룹에서 가장 큰 값을 반환한다.
$mergeObjects 각 그룹에 대한 입력 Document를 조합하여 작성한 Document를 반환한다.
$min 각 그룹에서 가장 작은 값을 반환한다.
$push 각 그룹의 필드 값의 배열을 반환한다.
$stdDevPop 입력 값의 모집단 표준 편차를 반환한다.
$stdDevSamp 입력 값의 샘플 표준 편차를 반환한다.
$sum 각 그룹의 숫자형 데이터의 합을 반환한다. 숫자가 아닌 값은 무시한다.

5) $project

$project에서 지정한 필드 값을 다음 파이프라인 단계로 전달한다. RDBMS의 SELECT와 같은 역할을 수행한다. 필드의 값이 0일 경우 안보여주며 1일 경우 보여준다.

{
  "_id" : 1,
  title: "abc123",
  isbn: "0001122223334",
  author: { last: "zzz", first: "aaa" },
  copies: 5
}
db.books.aggregate( [ { $project : { title : 1 , author : 1 } } ] )

연산 결과는 다음과 같다.

{ "_id" : 1, "title" : "abc123", "author" : { "last" : "zzz", "first" : "aaa" } }

6) $sort

$sort는 정렬 조건에 맞게 파이프라인의 연산결과를 정렬한다. ASC는 1, DESC는 -1로 표현한다.

{ "_id" : 1, "subject" : "History", "score" : 88 }
{ "_id" : 2, "subject" : "History", "score" : 92 }
{ "_id" : 3, "subject" : "History", "score" : 97 }
{ "_id" : 4, "subject" : "History", "score" : 71 }
{ "_id" : 5, "subject" : "History", "score" : 79 }
{ "_id" : 6, "subject" : "History", "score" : 83 }
db.users.aggregate(
   [
     { $sort : { score : -1} }
   ]
)

연산 결과는 다음과 같다.

{ "_id" : 3, "subject" : "History", "score" : 97 }
{ "_id" : 2, "subject" : "History", "score" : 92 }
{ "_id" : 1, "subject" : "History", "score" : 88 }
{ "_id" : 6, "subject" : "History", "score" : 83 }
{ "_id" : 5, "subject" : "History", "score" : 79 }
{ "_id" : 4, "subject" : "History", "score" : 71 }

7) $skip

$skip은 입력한 갯수만큼 차례대로 Document를 skip한 데이터를 다음 파이프라인으로 전달한다.

db.article.aggregate(
    { $skip : 5 }
);

8) $sample

$sample은 컬렉션 내에서 입력한 갯수만큼 랜덤하게 Document를 출력한다.

{ "_id" : 1, "name" : "dave123", "q1" : true, "q2" : true }
{ "_id" : 2, "name" : "dave2", "q1" : false, "q2" : false  }
{ "_id" : 3, "name" : "ahn", "q1" : true, "q2" : true  }
{ "_id" : 4, "name" : "li", "q1" : true, "q2" : false  }
{ "_id" : 5, "name" : "annT", "q1" : false, "q2" : true  }
{ "_id" : 6, "name" : "li", "q1" : true, "q2" : true  }
{ "_id" : 7, "name" : "ty", "q1" : false, "q2" : true  }
db.users.aggregate(
   [ { $sample: { size: 3 } } ]
)

연산 결과는 다음과 같다.

{ "_id" : 2, "name" : "dave2", "q1" : false, "q2" : false  }
{ "_id" : 4, "name" : "li", "q1" : true, "q2" : false  }
{ "_id" : 7, "name" : "ty", "q1" : false, "q2" : true  }

9) $count

$count는 입력하는 문서 수의 카운트가 포함된 문서를 다음 단계로 전달한다.

{ "_id" : 1, "subject" : "History", "score" : 88 }
{ "_id" : 2, "subject" : "History", "score" : 92 }
{ "_id" : 3, "subject" : "History", "score" : 97 }
{ "_id" : 4, "subject" : "History", "score" : 71 }
{ "_id" : 5, "subject" : "History", "score" : 79 }
{ "_id" : 6, "subject" : "History", "score" : 83 }
db.scores.aggregate(
  [
    {
      $match: {
        score: {
          $gt: 80
        }
      }
    },
    {
      $count: "passing_scores"
    }
  ]
)

연산 결과는 다음과 같다.

{ "passing_scores" : 4 }

10) $addField

$addField는 Document에 새 필드를 추가한다. Document 및 새로 추가된 필드에서 모든 기준 필드가 포함된 문서를 출력한다. 실제 Document의 문서 내용을 변경하는 것이 아닌 조회를 하는 용도로 사용한다.

{
  _id: 1,
  student: "Maya",
  homework: [ 10, 5, 10 ],
  quiz: [ 10, 8 ],
  extraCredit: 0
}
{
  _id: 2,
  student: "Ryan",
  homework: [ 5, 6, 5 ],
  quiz: [ 8, 8 ],
  extraCredit: 8
}

집계 연산을 통해 homework, quiz 필드에 대한 배열의 합을 기존 Document에 추가한다.

db.scores.aggregate( [
   {
     $addFields: {
       totalHomework: { $sum: "$homework" } ,
       totalQuiz: { $sum: "$quiz" }
     }
   },
   {
     $addFields: { totalScore:
       { $add: [ "$totalHomework", "$totalQuiz", "$extraCredit" ] } }
   }
] )

연산 결과는 다음과 같다.

{
  "_id" : 1,
  "student" : "Maya",
  "homework" : [ 10, 5, 10 ],
  "quiz" : [ 10, 8 ],
  "extraCredit" : 0,
  "totalHomework" : 25,
  "totalQuiz" : 18,
  "totalScore" : 43
}
{
  "_id" : 2,
  "student" : "Ryan",
  "homework" : [ 5, 6, 5 ],
  "quiz" : [ 8, 8 ],
  "extraCredit" : 8,
  "totalHomework" : 16,
  "totalQuiz" : 16,
  "totalScore" : 40
}

11) $limit

$limit은 파이프라인 연산으로 출력된 Document의 갯수를 제한한다.

db.article.aggregate(
    { $limit : 5 }
);

12) $unwind

$unwind는 Document내의 배열 필드를 기반으로 각각의 Doucument로 분리한다.

{
  $unwind:
    {
      path: <field path>,
      includeArrayIndex: <string>,
      preserveNullAndEmptyArrays: <boolean>
    }
}
필드 설명
path 배열 필드의 필드 경로이다.
includeArrayIndex 요소의 배열 인덱스 값을 저장할 새 필드의 이름이다.
preserveNullAndEmptyArrays true일 경우 path 필드 값이 null, qls qodufdls ruddndp $unwind 연산 결과가 Document에 표시된다. false일 경우 $unwind 연산 경로가가 Document에 표시 되지 않는다.
{ "_id" : 1, "item" : "ABC1", sizes: [ "S", "M", "L"] }
db.inventory.aggregate( [ { $unwind : "$sizes" } ] )

연산 결과는 다음과 같다.

{ "_id" : 1, "item" : "ABC1", "sizes" : "S" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "M" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "L" }

[참고] jaehun2841.github.io/2019/02/24/2019-02-24-mongodb-2/#%EC%98%88%EC%A0%9C-3-preservenullandemptyarrays-%EC%86%8D%EC%84%B1%EC%9D%84-%EC%9D%B4%EC%9A%A9%ED%95%9C-%EC%B6%9C%EB%A0%A5

728x90

'Database > MongoDB' 카테고리의 다른 글

[MongoDB] 잠금, 트랜잭션  (0) 2020.12.19
[MongoDB] Index  (0) 2020.12.19
[MongoDB] Sharding (Shard, Config Server, Mongos)  (0) 2020.12.18
[MongoDB] Sharding  (0) 2020.12.18