Data Processing with the MongoDB Aggregation Pipeline
Data Processing with the MongoDB Aggregtaion Pipeline
The MongoDB aggregation pipeline processes data records through a multi-stage pipeline, transforming documents at each step. Stages like filtering, grouping, and reshaping allow for complex data analysis and reporting.
Pipeline Stages
$match: Filtering Records
The $match stage filters the document stream to allow only matching documents to pass to the next stage. Placing $match early in the pipeline improves performance by reducing the dataset size.
db.transactions.aggregate([
{ $match: { created_at: { $gte: new Date("2022-06-01") } } }
]);
$group: Aggregating Values
The $group stage groups input documents by a specified _id expression and applies accumulator expressions (e.g., sum, average, max) to each group.
db.transactions.aggregate([
{ $group: {
_id: "$category",
totalRevenue: { $sum: { $multiply: ["$units_sold", "$unit_price"] } }
}}
]);
$project: Reshaping Documents
The $project stage reshapes each document in the stream, allowing you to include, exclude, or compute new fields.
db.transactions.aggregate([
{ $project: { itemType: "$category", soldCount: "$units_sold", _id: 0 } }
]);
$sort: Ordering Results
The $sort stage reorders the document stream by a specified sort key. Use 1 for ascending and -1 for descending order.
db.transactions.aggregate([
{ $group: { _id: "$category", totalRevenue: { $sum: { $multiply: ["$units_sold", "$unit_price"] } } } },
{ $sort: { totalRevenue: -1 } }
]);
$lookup: Joining Collections
The $lookup stage performs a left outer join to another collection in the same database to filter in documents from the "joined" collection for porcessing.
db.purchases.aggregate([
{
$lookup: {
from: "inventory",
localField: "sku_code",
foreignField: "_id",
as: "inventory_details"
}
}
]);
$limit: Constraining Output
The $limit stage passes the first N documents to the next pipeline stage, discarding the rest.
db.transactions.aggregate([
{ $limit: 5 }
]);