Die Aggregation-Pipeline ist MongoDB’s mächtigstes Query-Tool.
Während find() Dokumente filtert und returned,
transformiert, gruppiert, joined und analysiert die Aggregation-Pipeline
Daten in komplexen Multi-Stage-Workflows. Man kann Summen berechnen,
Durchschnitte ermitteln, Daten aus mehreren Collections joinen,
Time-Series-Analysen durchführen, komplexe Transformationen vornehmen –
alles innerhalb der Datenbank, ohne Daten zu Application-Code zu
transferieren.
Für Analytics-Workloads – Reports, Dashboards, Business-Intelligence
– ist die Aggregation-Pipeline essentiell. Ein typisches Pattern: Eine
E-Commerce-Site will “Top 10 Products by Revenue last month, grouped by
category”. Mit find() müsste man alle Orders fetchen, im
Application-Code filtern, groupen, sortieren – ineffizient und langsam.
Mit Aggregation passiert alles in MongoDB, nah an den Daten, mit
Index-Optimierung und Parallelisierung.
Aber die Pipeline ist komplex. Sie hat dutzende Stages, hunderte Operators, subtile Performance-Charakteristiken. Eine schlecht geschriebene Pipeline kann langsamer sein als Application-Code. Eine gut geschriebene Pipeline nutzt Indexes optimal, minimiert Daten-Movement und parallelisiert auf Shards. Dieses Kapitel navigiert durch die Aggregation-Landschaft systematisch, von Basics bis zu fortgeschrittenen Patterns.
Die Aggregation-Pipeline ist ein Array von Stages. Jede Stage nimmt Dokumente als Input, transformiert sie und passed sie zur nächsten Stage. Dies ist funktionales Programming – jede Stage ist eine reine Funktion, der Output einer Stage ist der Input der nächsten.
db.orders.aggregate([
// Stage 1: Filter
{ $match: { status: "completed" } },
// Stage 2: Group und aggregate
{ $group: {
_id: "$customerId",
totalSpent: { $sum: "$amount" },
orderCount: { $sum: 1 }
}},
// Stage 3: Sort
{ $sort: { totalSpent: -1 } },
// Stage 4: Limit
{ $limit: 10 }
])Diese Pipeline: 1. Filtert completed Orders ($match) 2.
Gruppiert nach Customer, summiert amount und count ($group)
3. Sortiert nach totalSpent descending ($sort) 4. Limitiert
auf Top 10 ($limit)
Das Resultat: Die Top-10-Kunden nach Gesamt-Ausgaben. Jede Stage reduziert oder transformiert die Daten – die Pipeline ist eine Daten-Verarbeitungs-Fließband.
Jede Stage reduziert die Datenmenge (ideally), was nachfolgende
Stages effizienter macht. Die Reihenfolge ist kritisch –
$match früh reduziert früh, $sort spät auf
wenigen Dokumenten ist günstiger als früh auf vielen.
Die $match-Stage filtert Dokumente analog zu
find(). Die Syntax ist identisch:
db.orders.aggregate([
{ $match: {
orderDate: { $gte: new Date("2024-01-01") },
status: "completed"
}}
])Warum $match früh platzieren:
$match als erste Stage kann Indexes nutzen, genauso wie
find(). Dies ist fundamental für Performance:
// Gut: $match nutzt Index
db.orders.aggregate([
{ $match: { customerId: "CUST-123" } }, // Index-Scan
{ $unwind: "$items" },
{ $group: { _id: "$items.category", total: { $sum: "$items.price" } } }
])
// Schlecht: $match nach $unwind, kein Index
db.orders.aggregate([
{ $unwind: "$items" },
{ $match: { "items.category": "electronics" } }, // Table-Scan nach Unwind
{ $group: { _id: "$customerId", total: { $sum: "$items.price" } } }
])Die erste Pipeline scannt nur Orders von CUST-123 (wenige), dann unwinds. Die zweite unwinds alle Orders (viele), dann filtert – massiv ineffizient.
Multiple $match Stages:
Man kann mehrere $match-Stages haben – frühe für
Index-Nutzung, späte für Conditions auf transformierten Daten:
db.orders.aggregate([
{ $match: { status: "completed" } }, // Early filter, Index
{ $group: {
_id: "$customerId",
totalSpent: { $sum: "$amount" }
}},
{ $match: { totalSpent: { $gte: 1000 } } } // Late filter, auf aggregiertem Wert
])Die zweite $match filtert auf totalSpent,
ein Feld das erst durch $group existiert. Dies ist valide
und oft nötig.
Die $group-Stage ist das Herz vieler Pipelines. Sie
gruppiert Dokumente nach einem Key und berechnet aggregierte Werte pro
Gruppe.
db.sales.aggregate([
{ $group: {
_id: "$productCategory", // Group-by-Field
totalRevenue: { $sum: "$amount" }, // Sum amount per category
avgPrice: { $avg: "$amount" }, // Average
count: { $sum: 1 }, // Count documents
products: { $addToSet: "$productId" } // Unique products
}}
])Das _id-Feld definiert den Gruppierungs-Key. Alle
Dokumente mit demselben _id-Value werden zusammengefasst.
Für jede Gruppe werden die spezifizierten Akkumulator-Expressions
berechnet.
Akkumulator-Operatoren:
| Operator | Zweck | Beispiel |
|---|---|---|
$sum |
Summiert Werte | $sum: "$price" oder $sum: 1 (count) |
$avg |
Durchschnitt | $avg: "$rating" |
$min / $max |
Minimum/Maximum | $min: "$price" |
$first / $last |
Erstes/Letztes Dokument | $first: "$createdAt" |
$push |
Alle Werte in Array | $push: "$productId" |
$addToSet |
Unique Werte in Array | $addToSet: "$tag" |
$stdDevPop |
Standardabweichung | $stdDevPop: "$score" |
Compound Group Keys:
Für Gruppierung nach mehreren Feldern nutzt man ein Object als
_id:
db.sales.aggregate([
{ $group: {
_id: {
category: "$productCategory",
region: "$customerRegion"
},
revenue: { $sum: "$amount" }
}}
])Dies gruppiert nach Kombination von Category und Region. Das Resultat
hat _id-Objects wie
{ category: "Electronics", region: "EU" }.
Group ohne Key – Totale Aggregation:
Um über alle Dokumente zu aggregieren ohne Gruppierung:
db.orders.aggregate([
{ $group: {
_id: null, // Keine Gruppierung
totalOrders: { $sum: 1 },
totalRevenue: { $sum: "$amount" },
avgOrderValue: { $avg: "$amount" }
}}
])Das Resultat ist ein einzelnes Dokument mit Totals für die gesamte Collection.
Die $project-Stage selektiert, transformiert oder
erstellt Felder. Sie ist analog zur Projection in find(),
aber viel mächtiger.
db.users.aggregate([
{ $project: {
_id: 0, // Exclude _id
username: 1, // Include username
fullName: {
$concat: ["$firstName", " ", "$lastName"] // Computed field
},
accountAge: {
$dateDiff: {
startDate: "$createdAt",
endDate: "$$NOW",
unit: "day"
}
}
}}
])Dies inkludiert username, computed fullName
von firstName+lastName, und berechnet accountAge in
Tagen.
Inclusion vs. Exclusion:
Wie bei find()-Projection kann man nicht inclusion und
exclusion mixen (außer für _id):
// Valide: Nur inclusion
{ $project: { username: 1, email: 1 } }
// Valide: Nur exclusion
{ $project: { password: 0, internalId: 0 } }
// Invalid: Mix (außer _id)
{ $project: { username: 1, password: 0 } } // ErrorNested Field Projection:
{ $project: {
"address.city": 1,
"address.country": 1
}}Oder reshaping:
{ $project: {
city: "$address.city",
country: "$address.country"
}}Dies flacht die Struktur – address.city wird zu
top-level city.
Die $sort-Stage sortiert Dokumente:
db.products.aggregate([
{ $sort: { price: -1 } } // Descending by price
])Für Multi-Field-Sort:
{ $sort: { category: 1, price: -1 } }
// Erst nach category ascending, dann price descendingPerformance-Kritisch:
$sort ist Memory-intensive. MongoDB lädt Dokumente in
RAM zum Sortieren. Für große Resultsets (Hundertausende Dokumente) kann
dies RAM-Limits überschreiten (100 MB per Stage-Default).
Die Lösung: Index-backed Sort. Wenn ein Index auf dem Sort-Field
existiert und $sort früh in der Pipeline ist (vor
transformierenden Stages), kann MongoDB den Index in Sort-Order
traversieren – kein RAM-intensives Sort nötig:
// Index exists on createdAt
db.createIndex({ createdAt: -1 })
// Pipeline nutzt Index für Sort
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $sort: { createdAt: -1 } }, // Index-backed
{ $limit: 100 }
])Aber nach transformierenden Stages (wie $group,
$unwind, $project mit Computed-Fields) sind
Indexes nicht mehr anwendbar – Sort muss in-memory passieren.
$limit für Effizienz:
$limit nach $sort ist common für
Top-N-Queries:
db.products.aggregate([
{ $sort: { salesCount: -1 } },
{ $limit: 10 }
])MongoDB optimiert $sort + $limit – es braucht nicht alle
Dokumente zu sortieren, nur genug um die Top-10 zu finden. Dies ist
effizienter als Sort-All-Then-Limit.
Die $unwind-Stage “entfaltet” Arrays – jedes
Array-Element wird zu einem separaten Dokument:
// Input document
{
orderId: "ORD-123",
items: [
{ productId: "A", price: 10 },
{ productId: "B", price: 20 }
]
}
// After $unwind: "$items"
// Dokument 1
{
orderId: "ORD-123",
items: { productId: "A", price: 10 }
}
// Dokument 2
{
orderId: "ORD-123",
items: { productId: "B", price: 20 }
}Dies ist nützlich für Aggregationen auf Array-Elementen:
db.orders.aggregate([
{ $unwind: "$items" },
{ $group: {
_id: "$items.productId",
totalSold: { $sum: "$items.quantity" },
revenue: { $sum: { $multiply: ["$items.quantity", "$items.price"] } }
}}
])Ohne $unwind könnte man nicht über alle Items aller
Orders aggregieren – man müsste Application-Code nutzen.
preserveNullAndEmptyArrays:
Per Default entfernt $unwind Dokumente, wo das Array
fehlt oder leer ist. Mit preserveNullAndEmptyArrays: true
bleiben sie:
{ $unwind: {
path: "$items",
preserveNullAndEmptyArrays: true
}}Dies ist wichtig für Outer-Join-ähnliche Semantik – man will Dokumente behalten, selbst wenn sie keine Array-Elemente haben.
Die $lookup-Stage performed Left-Outer-Joins zwischen
Collections:
db.orders.aggregate([
{
$lookup: {
from: "customers", // Collection to join
localField: "customerId", // Field in orders
foreignField: "_id", // Field in customers
as: "customerInfo" // Output array field
}
}
])Für jede Order fetched MongoDB das matching Customer-Dokument aus
customers und embeddet es als
customerInfo-Array. Wenn kein Match existiert, ist
customerInfo ein leeres Array.
Resultat-Struktur:
{
orderId: "ORD-123",
customerId: "CUST-456",
amount: 100,
customerInfo: [
{
_id: "CUST-456",
name: "Alice Smith",
email: "alice@example.com"
}
]
}Typischerweise unwinds man das Resultat-Array:
db.orders.aggregate([
{ $lookup: { from: "customers", localField: "customerId", foreignField: "_id", as: "customerInfo" } },
{ $unwind: "$customerInfo" },
{ $project: {
orderId: 1,
amount: 1,
customerName: "$customerInfo.name",
customerEmail: "$customerInfo.email"
}}
])Pipeline $lookup (MongoDB 3.6+):
Für komplexere Joins mit Conditions oder Sub-Pipelines:
db.orders.aggregate([
{
$lookup: {
from: "products",
let: { orderItems: "$items" }, // Variables for sub-pipeline
pipeline: [
{ $match: {
$expr: { $in: ["$_id", "$$orderItems.productId"] }
}},
{ $project: { name: 1, category: 1 } }
],
as: "productDetails"
}
}
])Dies erlaubt Filtering und Transformation im Join selbst – sehr mächtig für komplexe Daten-Enrichment.
Example 1: Sales Report by Category and Month
db.sales.aggregate([
// Filter auf 2024
{ $match: {
saleDate: {
$gte: new Date("2024-01-01"),
$lt: new Date("2025-01-01")
}
}},
// Extract year und month
{ $project: {
category: 1,
amount: 1,
year: { $year: "$saleDate" },
month: { $month: "$saleDate" }
}},
// Group by category und month
{ $group: {
_id: {
category: "$category",
year: "$year",
month: "$month"
},
revenue: { $sum: "$amount" },
transactionCount: { $sum: 1 }
}},
// Sort
{ $sort: {
"_id.year": 1,
"_id.month": 1,
"_id.category": 1
}}
])Dies produziert einen Monthly-Revenue-Report per Category – perfect für Dashboards.
Example 2: Customer Lifetime Value (CLV)
db.orders.aggregate([
// Group by customer
{ $group: {
_id: "$customerId",
totalSpent: { $sum: "$amount" },
orderCount: { $sum: 1 },
firstOrder: { $min: "$orderDate" },
lastOrder: { $max: "$orderDate" }
}},
// Join with customer details
{ $lookup: {
from: "customers",
localField: "_id",
foreignField: "_id",
as: "customer"
}},
{ $unwind: "$customer" },
// Calculate CLV metrics
{ $project: {
customerName: "$customer.name",
email: "$customer.email",
totalSpent: 1,
orderCount: 1,
avgOrderValue: { $divide: ["$totalSpent", "$orderCount"] },
customerLifetimeDays: {
$dateDiff: {
startDate: "$firstOrder",
endDate: "$lastOrder",
unit: "day"
}
}
}},
// Sort by total spent
{ $sort: { totalSpent: -1 } },
{ $limit: 100 }
])Dies gibt die Top-100-Kunden nach CLV mit detaillierten Metriken.
Example 3: Inventory Low-Stock Alert
db.inventory.aggregate([
// Join mit product details
{ $lookup: {
from: "products",
localField: "productId",
foreignField: "_id",
as: "product"
}},
{ $unwind: "$product" },
// Calculate stock ratio
{ $project: {
productName: "$product.name",
category: "$product.category",
currentStock: "$quantity",
reorderLevel: "$product.reorderLevel",
stockRatio: {
$divide: ["$quantity", "$product.reorderLevel"]
}
}},
// Filter low stock (< 50% of reorder level)
{ $match: { stockRatio: { $lt: 0.5 } } },
// Sort by urgency
{ $sort: { stockRatio: 1 } }
])Dies identifiziert Produkte mit kritischem Stock-Level für Procurement.
Aggregation-Performance hängt fundamental von Index-Nutzung und Stage-Ordering ab.
Rule 1: $match und $sort früh, mit Indexes
// Gut: Match nutzt Index
db.orders.aggregate([
{ $match: { customerId: "CUST-123" } }, // Index-backed
{ $unwind: "$items" },
{ $group: { ... } }
])
// Schlecht: Match nach transformierenden Stages
db.orders.aggregate([
{ $unwind: "$items" },
{ $group: { ... } },
{ $match: { ... } } // Kein Index, viele Dokumente
])Rule 2: Reduziere Daten früh
Jede Stage, die Dokumente reduziert (Filter, Limit) sollte früh stehen, um nachfolgende Stages effizienter zu machen:
db.logs.aggregate([
{ $match: { level: "error", date: { $gte: yesterday } } }, // Reduziert stark
{ $lookup: { from: "users", ... } }, // Weniger Lookups nötig
{ $group: { ... } }
])Rule 3: Vermeide $lookup auf große Collections ohne Filter
$lookup ist teuer – es performed N Queries (N = Anzahl
Dokumente im Pipeline). Für große N ist dies katastrophal. Filter vor
$lookup:
// Besser
db.orders.aggregate([
{ $match: { status: "pending" } }, // Nur 100 Orders
{ $lookup: { from: "customers", ... } } // 100 Lookups
])
// Als
db.orders.aggregate([
{ $lookup: { from: "customers", ... } }, // 1 Million Lookups
{ $match: { status: "pending" } }
])Explain für Aggregations:
db.orders.aggregate([...], { explain: true })Die Explain-Output zeigt, welche Stages Indexes nutzen, wie viele Dokumente jede Stage processed, Memory-Usage, etc. Für Performance-Tuning ist dies essentiell.
Aggregation-Stages haben ein 100-MB-RAM-Limit per Stage. Für große Pipelines – etwa Sort auf Millionen Dokumenten – kann dies überschritten werden:
Error: exceeded memory limit for $sort
Die Lösung: allowDiskUse: true:
db.orders.aggregate([
{ $sort: { amount: -1 } },
{ $limit: 1000 }
], { allowDiskUse: true })Dies erlaubt MongoDB, Temp-Files auf Disk zu nutzen für Memory-intensive Stages. Es ist langsamer als RAM, aber verhindert Out-of-Memory-Errors.
Trade-off:
allowDiskUse macht die Pipeline langsamer (Disk-I/O
vs. RAM). Besser: Pipeline so designen, dass sie im RAM bleibt – etwa
durch frühe Filter oder Index-backed Sorts.
In Sharded Clusters werden Aggregation-Pipelines parallelisiert. Jeder Shard executed die Pipeline auf seine lokalen Daten, mongos merged die Results.
Shard-Targeted vs. Scatter-Gather:
Wenn die Pipeline einen $match auf dem Shard-Key hat,
kann mongos die Pipeline nur an relevante Shards routen:
// Shard-Key ist customerId
db.orders.aggregate([
{ $match: { customerId: "CUST-123" } }, // Nur ein Shard involviert
{ $group: { ... } }
])Ohne Shard-Key im $match muss die Pipeline an alle
Shards gehen (Scatter-Gather), was langsamer ist.
Stages die nicht parallelisierbar sind:
Manche Stages müssen auf mongos laufen, nicht auf Shards: -
$lookup (wenn joining from nicht-sharded Collection) -
$group ohne Shard-Key in _id -
$sort (finale Merge-Sort auf mongos)
Dies kann Performance beeinträchtigen – mongos wird zum Bottleneck. Für extreme-scale Analytics sollte man Pipelines designen, die maximal auf Shards parallelisieren.
Die folgende Tabelle fasst wichtige Stages zusammen:
| Stage | Zweck | Index-Nutzung | Memory-Impact |
|---|---|---|---|
$match |
Filter | Ja (wenn früh) | Niedrig |
$project |
Feld-Transformation | Nein | Niedrig |
$group |
Aggregation | Nein | Hoch (ohne allowDiskUse) |
$sort |
Sortierung | Ja (wenn früh + Index) | Hoch |
$limit |
Result-Limit | - | Niedrig |
$skip |
Pagination | - | Niedrig |
$unwind |
Array entfalten | Nein | Mittel |
$lookup |
Join | Nein | Hoch (N queries) |
$out / $merge |
Output zu Collection | - | Niedrig |
Die Aggregation-Pipeline ist MongoDB’s Analytics-Powerhouse. Sie erlaubt komplexe Datenverarbeitung direkt in der Datenbank mit Syntax, die deutlich flexibler ist als SQL’s GROUP BY. Aber diese Flexibilität kommt mit Komplexität – Performance hängt fundamental von Stage-Ordering, Index-Nutzung und Pipeline-Design ab. Eine naive Pipeline kann Minuten laufen, eine optimierte Sekunden. Die Best Practice: Start mit klarem Verständnis der Daten und gewünschten Outputs, designe die Pipeline mit Performance im Kopf (frühe Filter, Index-backed Sorts, minimal Stages), teste mit Explain, und iteriere. Mit korrektem Design ist die Aggregation-Pipeline nicht nur mächtig, sondern auch performant – selbst auf Hunderten Millionen Dokumenten.