37 Aggregation Framework: MongoDB’s Analytics-Engine

Die Aggregation-Pipeline ist MongoDB’s mächtigstes Query-Tool. Während find() Dokumente filtert und returned, transformiert, gruppiert, joined und analysiert die Aggregation-Pipeline Daten in komplexen Multi-Stage-Workflows. Man kann Summen berechnen, Durchschnitte ermitteln, Daten aus mehreren Collections joinen, Time-Series-Analysen durchführen, komplexe Transformationen vornehmen – alles innerhalb der Datenbank, ohne Daten zu Application-Code zu transferieren.

Für Analytics-Workloads – Reports, Dashboards, Business-Intelligence – ist die Aggregation-Pipeline essentiell. Ein typisches Pattern: Eine E-Commerce-Site will “Top 10 Products by Revenue last month, grouped by category”. Mit find() müsste man alle Orders fetchen, im Application-Code filtern, groupen, sortieren – ineffizient und langsam. Mit Aggregation passiert alles in MongoDB, nah an den Daten, mit Index-Optimierung und Parallelisierung.

Aber die Pipeline ist komplex. Sie hat dutzende Stages, hunderte Operators, subtile Performance-Charakteristiken. Eine schlecht geschriebene Pipeline kann langsamer sein als Application-Code. Eine gut geschriebene Pipeline nutzt Indexes optimal, minimiert Daten-Movement und parallelisiert auf Shards. Dieses Kapitel navigiert durch die Aggregation-Landschaft systematisch, von Basics bis zu fortgeschrittenen Patterns.

37.1 Pipeline-Architektur: Stages als Transformations-Kette

Die Aggregation-Pipeline ist ein Array von Stages. Jede Stage nimmt Dokumente als Input, transformiert sie und passed sie zur nächsten Stage. Dies ist funktionales Programming – jede Stage ist eine reine Funktion, der Output einer Stage ist der Input der nächsten.

db.orders.aggregate([
  // Stage 1: Filter
  { $match: { status: "completed" } },
  
  // Stage 2: Group und aggregate
  { $group: {
      _id: "$customerId",
      totalSpent: { $sum: "$amount" },
      orderCount: { $sum: 1 }
  }},
  
  // Stage 3: Sort
  { $sort: { totalSpent: -1 } },
  
  // Stage 4: Limit
  { $limit: 10 }
])

Diese Pipeline: 1. Filtert completed Orders ($match) 2. Gruppiert nach Customer, summiert amount und count ($group) 3. Sortiert nach totalSpent descending ($sort) 4. Limitiert auf Top 10 ($limit)

Das Resultat: Die Top-10-Kunden nach Gesamt-Ausgaben. Jede Stage reduziert oder transformiert die Daten – die Pipeline ist eine Daten-Verarbeitungs-Fließband.

Jede Stage reduziert die Datenmenge (ideally), was nachfolgende Stages effizienter macht. Die Reihenfolge ist kritisch – $match früh reduziert früh, $sort spät auf wenigen Dokumenten ist günstiger als früh auf vielen.

37.2 $match: Filter für Pipeline-Eingang

Die $match-Stage filtert Dokumente analog zu find(). Die Syntax ist identisch:

db.orders.aggregate([
  { $match: { 
      orderDate: { $gte: new Date("2024-01-01") },
      status: "completed"
  }}
])

Warum $match früh platzieren:

$match als erste Stage kann Indexes nutzen, genauso wie find(). Dies ist fundamental für Performance:

// Gut: $match nutzt Index
db.orders.aggregate([
  { $match: { customerId: "CUST-123" } },  // Index-Scan
  { $unwind: "$items" },
  { $group: { _id: "$items.category", total: { $sum: "$items.price" } } }
])

// Schlecht: $match nach $unwind, kein Index
db.orders.aggregate([
  { $unwind: "$items" },
  { $match: { "items.category": "electronics" } },  // Table-Scan nach Unwind
  { $group: { _id: "$customerId", total: { $sum: "$items.price" } } }
])

Die erste Pipeline scannt nur Orders von CUST-123 (wenige), dann unwinds. Die zweite unwinds alle Orders (viele), dann filtert – massiv ineffizient.

Multiple $match Stages:

Man kann mehrere $match-Stages haben – frühe für Index-Nutzung, späte für Conditions auf transformierten Daten:

db.orders.aggregate([
  { $match: { status: "completed" } },          // Early filter, Index
  { $group: { 
      _id: "$customerId", 
      totalSpent: { $sum: "$amount" }
  }},
  { $match: { totalSpent: { $gte: 1000 } } }   // Late filter, auf aggregiertem Wert
])

Die zweite $match filtert auf totalSpent, ein Feld das erst durch $group existiert. Dies ist valide und oft nötig.

37.3 $group: Aggregation und Statistiken

Die $group-Stage ist das Herz vieler Pipelines. Sie gruppiert Dokumente nach einem Key und berechnet aggregierte Werte pro Gruppe.

db.sales.aggregate([
  { $group: {
      _id: "$productCategory",           // Group-by-Field
      totalRevenue: { $sum: "$amount" }, // Sum amount per category
      avgPrice: { $avg: "$amount" },     // Average
      count: { $sum: 1 },                // Count documents
      products: { $addToSet: "$productId" }  // Unique products
  }}
])

Das _id-Feld definiert den Gruppierungs-Key. Alle Dokumente mit demselben _id-Value werden zusammengefasst. Für jede Gruppe werden die spezifizierten Akkumulator-Expressions berechnet.

Akkumulator-Operatoren:

Operator Zweck Beispiel
$sum Summiert Werte $sum: "$price" oder $sum: 1 (count)
$avg Durchschnitt $avg: "$rating"
$min / $max Minimum/Maximum $min: "$price"
$first / $last Erstes/Letztes Dokument $first: "$createdAt"
$push Alle Werte in Array $push: "$productId"
$addToSet Unique Werte in Array $addToSet: "$tag"
$stdDevPop Standardabweichung $stdDevPop: "$score"

Compound Group Keys:

Für Gruppierung nach mehreren Feldern nutzt man ein Object als _id:

db.sales.aggregate([
  { $group: {
      _id: { 
        category: "$productCategory",
        region: "$customerRegion"
      },
      revenue: { $sum: "$amount" }
  }}
])

Dies gruppiert nach Kombination von Category und Region. Das Resultat hat _id-Objects wie { category: "Electronics", region: "EU" }.

Group ohne Key – Totale Aggregation:

Um über alle Dokumente zu aggregieren ohne Gruppierung:

db.orders.aggregate([
  { $group: {
      _id: null,  // Keine Gruppierung
      totalOrders: { $sum: 1 },
      totalRevenue: { $sum: "$amount" },
      avgOrderValue: { $avg: "$amount" }
  }}
])

Das Resultat ist ein einzelnes Dokument mit Totals für die gesamte Collection.

37.4 $project: Felder transformieren und reshapen

Die $project-Stage selektiert, transformiert oder erstellt Felder. Sie ist analog zur Projection in find(), aber viel mächtiger.

db.users.aggregate([
  { $project: {
      _id: 0,                                    // Exclude _id
      username: 1,                               // Include username
      fullName: { 
        $concat: ["$firstName", " ", "$lastName"]  // Computed field
      },
      accountAge: { 
        $dateDiff: {
          startDate: "$createdAt",
          endDate: "$$NOW",
          unit: "day"
        }
      }
  }}
])

Dies inkludiert username, computed fullName von firstName+lastName, und berechnet accountAge in Tagen.

Inclusion vs. Exclusion:

Wie bei find()-Projection kann man nicht inclusion und exclusion mixen (außer für _id):

// Valide: Nur inclusion
{ $project: { username: 1, email: 1 } }

// Valide: Nur exclusion
{ $project: { password: 0, internalId: 0 } }

// Invalid: Mix (außer _id)
{ $project: { username: 1, password: 0 } }  // Error

Nested Field Projection:

{ $project: { 
    "address.city": 1,
    "address.country": 1
}}

Oder reshaping:

{ $project: {
    city: "$address.city",
    country: "$address.country"
}}

Dies flacht die Struktur – address.city wird zu top-level city.

37.5 $sort und $limit: Ordering und Pagination

Die $sort-Stage sortiert Dokumente:

db.products.aggregate([
  { $sort: { price: -1 } }  // Descending by price
])

Für Multi-Field-Sort:

{ $sort: { category: 1, price: -1 } }
// Erst nach category ascending, dann price descending

Performance-Kritisch:

$sort ist Memory-intensive. MongoDB lädt Dokumente in RAM zum Sortieren. Für große Resultsets (Hundertausende Dokumente) kann dies RAM-Limits überschreiten (100 MB per Stage-Default).

Die Lösung: Index-backed Sort. Wenn ein Index auf dem Sort-Field existiert und $sort früh in der Pipeline ist (vor transformierenden Stages), kann MongoDB den Index in Sort-Order traversieren – kein RAM-intensives Sort nötig:

// Index exists on createdAt
db.createIndex({ createdAt: -1 })

// Pipeline nutzt Index für Sort
db.orders.aggregate([
  { $match: { status: "completed" } },
  { $sort: { createdAt: -1 } },   // Index-backed
  { $limit: 100 }
])

Aber nach transformierenden Stages (wie $group, $unwind, $project mit Computed-Fields) sind Indexes nicht mehr anwendbar – Sort muss in-memory passieren.

$limit für Effizienz:

$limit nach $sort ist common für Top-N-Queries:

db.products.aggregate([
  { $sort: { salesCount: -1 } },
  { $limit: 10 }
])

MongoDB optimiert $sort + $limit – es braucht nicht alle Dokumente zu sortieren, nur genug um die Top-10 zu finden. Dies ist effizienter als Sort-All-Then-Limit.

37.6 $unwind: Arrays zu einzelnen Dokumenten

Die $unwind-Stage “entfaltet” Arrays – jedes Array-Element wird zu einem separaten Dokument:

// Input document
{
  orderId: "ORD-123",
  items: [
    { productId: "A", price: 10 },
    { productId: "B", price: 20 }
  ]
}

// After $unwind: "$items"
// Dokument 1
{
  orderId: "ORD-123",
  items: { productId: "A", price: 10 }
}
// Dokument 2
{
  orderId: "ORD-123",
  items: { productId: "B", price: 20 }
}

Dies ist nützlich für Aggregationen auf Array-Elementen:

db.orders.aggregate([
  { $unwind: "$items" },
  { $group: {
      _id: "$items.productId",
      totalSold: { $sum: "$items.quantity" },
      revenue: { $sum: { $multiply: ["$items.quantity", "$items.price"] } }
  }}
])

Ohne $unwind könnte man nicht über alle Items aller Orders aggregieren – man müsste Application-Code nutzen.

preserveNullAndEmptyArrays:

Per Default entfernt $unwind Dokumente, wo das Array fehlt oder leer ist. Mit preserveNullAndEmptyArrays: true bleiben sie:

{ $unwind: { 
    path: "$items",
    preserveNullAndEmptyArrays: true
}}

Dies ist wichtig für Outer-Join-ähnliche Semantik – man will Dokumente behalten, selbst wenn sie keine Array-Elemente haben.

37.7 $lookup: Joins zwischen Collections

Die $lookup-Stage performed Left-Outer-Joins zwischen Collections:

db.orders.aggregate([
  {
    $lookup: {
      from: "customers",              // Collection to join
      localField: "customerId",       // Field in orders
      foreignField: "_id",            // Field in customers
      as: "customerInfo"              // Output array field
    }
  }
])

Für jede Order fetched MongoDB das matching Customer-Dokument aus customers und embeddet es als customerInfo-Array. Wenn kein Match existiert, ist customerInfo ein leeres Array.

Resultat-Struktur:

{
  orderId: "ORD-123",
  customerId: "CUST-456",
  amount: 100,
  customerInfo: [
    {
      _id: "CUST-456",
      name: "Alice Smith",
      email: "alice@example.com"
    }
  ]
}

Typischerweise unwinds man das Resultat-Array:

db.orders.aggregate([
  { $lookup: { from: "customers", localField: "customerId", foreignField: "_id", as: "customerInfo" } },
  { $unwind: "$customerInfo" },
  { $project: {
      orderId: 1,
      amount: 1,
      customerName: "$customerInfo.name",
      customerEmail: "$customerInfo.email"
  }}
])

Pipeline $lookup (MongoDB 3.6+):

Für komplexere Joins mit Conditions oder Sub-Pipelines:

db.orders.aggregate([
  {
    $lookup: {
      from: "products",
      let: { orderItems: "$items" },  // Variables for sub-pipeline
      pipeline: [
        { $match: { 
            $expr: { $in: ["$_id", "$$orderItems.productId"] }
        }},
        { $project: { name: 1, category: 1 } }
      ],
      as: "productDetails"
    }
  }
])

Dies erlaubt Filtering und Transformation im Join selbst – sehr mächtig für komplexe Daten-Enrichment.

37.8 Practical Examples: Real-World-Aggregations

Example 1: Sales Report by Category and Month

db.sales.aggregate([
  // Filter auf 2024
  { $match: { 
      saleDate: { 
        $gte: new Date("2024-01-01"),
        $lt: new Date("2025-01-01")
      }
  }},
  
  // Extract year und month
  { $project: {
      category: 1,
      amount: 1,
      year: { $year: "$saleDate" },
      month: { $month: "$saleDate" }
  }},
  
  // Group by category und month
  { $group: {
      _id: {
        category: "$category",
        year: "$year",
        month: "$month"
      },
      revenue: { $sum: "$amount" },
      transactionCount: { $sum: 1 }
  }},
  
  // Sort
  { $sort: { 
      "_id.year": 1,
      "_id.month": 1,
      "_id.category": 1
  }}
])

Dies produziert einen Monthly-Revenue-Report per Category – perfect für Dashboards.

Example 2: Customer Lifetime Value (CLV)

db.orders.aggregate([
  // Group by customer
  { $group: {
      _id: "$customerId",
      totalSpent: { $sum: "$amount" },
      orderCount: { $sum: 1 },
      firstOrder: { $min: "$orderDate" },
      lastOrder: { $max: "$orderDate" }
  }},
  
  // Join with customer details
  { $lookup: {
      from: "customers",
      localField: "_id",
      foreignField: "_id",
      as: "customer"
  }},
  { $unwind: "$customer" },
  
  // Calculate CLV metrics
  { $project: {
      customerName: "$customer.name",
      email: "$customer.email",
      totalSpent: 1,
      orderCount: 1,
      avgOrderValue: { $divide: ["$totalSpent", "$orderCount"] },
      customerLifetimeDays: {
        $dateDiff: {
          startDate: "$firstOrder",
          endDate: "$lastOrder",
          unit: "day"
        }
      }
  }},
  
  // Sort by total spent
  { $sort: { totalSpent: -1 } },
  { $limit: 100 }
])

Dies gibt die Top-100-Kunden nach CLV mit detaillierten Metriken.

Example 3: Inventory Low-Stock Alert

db.inventory.aggregate([
  // Join mit product details
  { $lookup: {
      from: "products",
      localField: "productId",
      foreignField: "_id",
      as: "product"
  }},
  { $unwind: "$product" },
  
  // Calculate stock ratio
  { $project: {
      productName: "$product.name",
      category: "$product.category",
      currentStock: "$quantity",
      reorderLevel: "$product.reorderLevel",
      stockRatio: { 
        $divide: ["$quantity", "$product.reorderLevel"]
      }
  }},
  
  // Filter low stock (< 50% of reorder level)
  { $match: { stockRatio: { $lt: 0.5 } } },
  
  // Sort by urgency
  { $sort: { stockRatio: 1 } }
])

Dies identifiziert Produkte mit kritischem Stock-Level für Procurement.

37.9 Performance-Optimierung: Indexes und Stage-Ordering

Aggregation-Performance hängt fundamental von Index-Nutzung und Stage-Ordering ab.

Rule 1: $match und $sort früh, mit Indexes

// Gut: Match nutzt Index
db.orders.aggregate([
  { $match: { customerId: "CUST-123" } },  // Index-backed
  { $unwind: "$items" },
  { $group: { ... } }
])

// Schlecht: Match nach transformierenden Stages
db.orders.aggregate([
  { $unwind: "$items" },
  { $group: { ... } },
  { $match: { ... } }  // Kein Index, viele Dokumente
])

Rule 2: Reduziere Daten früh

Jede Stage, die Dokumente reduziert (Filter, Limit) sollte früh stehen, um nachfolgende Stages effizienter zu machen:

db.logs.aggregate([
  { $match: { level: "error", date: { $gte: yesterday } } },  // Reduziert stark
  { $lookup: { from: "users", ... } },   // Weniger Lookups nötig
  { $group: { ... } }
])

Rule 3: Vermeide $lookup auf große Collections ohne Filter

$lookup ist teuer – es performed N Queries (N = Anzahl Dokumente im Pipeline). Für große N ist dies katastrophal. Filter vor $lookup:

// Besser
db.orders.aggregate([
  { $match: { status: "pending" } },  // Nur 100 Orders
  { $lookup: { from: "customers", ... } }  // 100 Lookups
])

// Als
db.orders.aggregate([
  { $lookup: { from: "customers", ... } },  // 1 Million Lookups
  { $match: { status: "pending" } }
])

Explain für Aggregations:

db.orders.aggregate([...], { explain: true })

Die Explain-Output zeigt, welche Stages Indexes nutzen, wie viele Dokumente jede Stage processed, Memory-Usage, etc. Für Performance-Tuning ist dies essentiell.

37.10 Memory Limits und allowDiskUse

Aggregation-Stages haben ein 100-MB-RAM-Limit per Stage. Für große Pipelines – etwa Sort auf Millionen Dokumenten – kann dies überschritten werden:

Error: exceeded memory limit for $sort

Die Lösung: allowDiskUse: true:

db.orders.aggregate([
  { $sort: { amount: -1 } },
  { $limit: 1000 }
], { allowDiskUse: true })

Dies erlaubt MongoDB, Temp-Files auf Disk zu nutzen für Memory-intensive Stages. Es ist langsamer als RAM, aber verhindert Out-of-Memory-Errors.

Trade-off:

allowDiskUse macht die Pipeline langsamer (Disk-I/O vs. RAM). Besser: Pipeline so designen, dass sie im RAM bleibt – etwa durch frühe Filter oder Index-backed Sorts.

37.11 Aggregation in Sharded Clusters: Parallelisierung

In Sharded Clusters werden Aggregation-Pipelines parallelisiert. Jeder Shard executed die Pipeline auf seine lokalen Daten, mongos merged die Results.

Shard-Targeted vs. Scatter-Gather:

Wenn die Pipeline einen $match auf dem Shard-Key hat, kann mongos die Pipeline nur an relevante Shards routen:

// Shard-Key ist customerId
db.orders.aggregate([
  { $match: { customerId: "CUST-123" } },  // Nur ein Shard involviert
  { $group: { ... } }
])

Ohne Shard-Key im $match muss die Pipeline an alle Shards gehen (Scatter-Gather), was langsamer ist.

Stages die nicht parallelisierbar sind:

Manche Stages müssen auf mongos laufen, nicht auf Shards: - $lookup (wenn joining from nicht-sharded Collection) - $group ohne Shard-Key in _id - $sort (finale Merge-Sort auf mongos)

Dies kann Performance beeinträchtigen – mongos wird zum Bottleneck. Für extreme-scale Analytics sollte man Pipelines designen, die maximal auf Shards parallelisieren.

Die folgende Tabelle fasst wichtige Stages zusammen:

Stage Zweck Index-Nutzung Memory-Impact
$match Filter Ja (wenn früh) Niedrig
$project Feld-Transformation Nein Niedrig
$group Aggregation Nein Hoch (ohne allowDiskUse)
$sort Sortierung Ja (wenn früh + Index) Hoch
$limit Result-Limit - Niedrig
$skip Pagination - Niedrig
$unwind Array entfalten Nein Mittel
$lookup Join Nein Hoch (N queries)
$out / $merge Output zu Collection - Niedrig

Die Aggregation-Pipeline ist MongoDB’s Analytics-Powerhouse. Sie erlaubt komplexe Datenverarbeitung direkt in der Datenbank mit Syntax, die deutlich flexibler ist als SQL’s GROUP BY. Aber diese Flexibilität kommt mit Komplexität – Performance hängt fundamental von Stage-Ordering, Index-Nutzung und Pipeline-Design ab. Eine naive Pipeline kann Minuten laufen, eine optimierte Sekunden. Die Best Practice: Start mit klarem Verständnis der Daten und gewünschten Outputs, designe die Pipeline mit Performance im Kopf (frühe Filter, Index-backed Sorts, minimal Stages), teste mit Explain, und iteriere. Mit korrektem Design ist die Aggregation-Pipeline nicht nur mächtig, sondern auch performant – selbst auf Hunderten Millionen Dokumenten.