Skip to content

[FEATURE] Add Per-Egress-Type Concurrency Limits to Improve Load Distribution #908

@mpisat

Description

@mpisat

Is your feature request related to a problem? Please describe.
I'm frustrated by frequent CPU exhaustion errors in the egress service, which terminate participant egresses unexpectedly:
2025-05-19T12:43:29.893Z ERROR egress service/process.go:175 killing egress {"nodeID": "NE_dxUEN5kbmipM", "clusterID": "", "egressID": "EG_q5csgbJkUmPm", "error": "CPU exhausted: 5.11 cores used"}

With 103 pods (12 cpu each), load distribution is uneven, causing some pods to overload while others are underutilized. The existing MaxConcurrentWeb limit (hardcoded to 18) only applies to Chrome-based egresses (RoomComposite and Web), but participant egresses can exceed safe limits (e.g., 3 tasks per pod), especially since 1080p recordings peak at 3–6 CPU cores but average 1.5–2 cores, allowing pods to accept too many tasks before hitting the cpuKillThreshold.
Describe the solution you'd like
Add per-egress-type concurrency limits to CPUCostConfig (e.g., MaxConcurrentParticipant, MaxConcurrentTrackComposite) in service.go, configurable via YAML, similar to MaxConcurrentWeb. For example, setting max_concurrent_participant: 3 would ensure no pod accepts more than 3 participant egresses, forcing the LiveKit server to distribute requests to other pods. This would complement CPU-based checks and improve load balancing across pods.

Describe alternatives you've considered
Tuning CPU Costs: Adjusting ParticipantCpuCost (default 2.0) to 3.0–5.0 accounts for peak usage but doesn’t cap task counts, leading to uneven distribution when CPU usage fluctuates.

Additional context
The issue occurs in a 103-pod Kubernetes deployment with 4-core pods. Participant egresses (1080p) peak at 5–6 cores, triggering terminations when pods accept too many tasks. A per-type limit would ensure predictable distribution (e.g., 3 tasks × 103 pods = 309 participant egresses) and reduce ErrCPUExhausted errors.

Proposed Implementation

Update pkg/config/service.go:Add per-type concurrency limits to CPUCostConfig and initialize defaults.

const (
    maxConcurrentRoomComposite  = 5
    maxConcurrentWebEgress      = 5
    maxConcurrentParticipant    = 3
    maxConcurrentTrackComposite = 5
    maxConcurrentTrack          = 10
)

type CPUCostConfig struct {
    // Existing fields
    MaxConcurrentWeb int32 `yaml:"max_concurrent_web"`
    // New fields
    MaxConcurrentRoomComposite  int32 `yaml:"max_concurrent_room_composite"`
    MaxConcurrentWebEgress      int32 `yaml:"max_concurrent_web_egress"`
    MaxConcurrentParticipant    int32 `yaml:"max_concurrent_participant"`
    MaxConcurrentTrackComposite int32 `yaml:"max_concurrent_track_composite"`
    MaxConcurrentTrack          int32 `yaml:"max_concurrent_track"`
    // ... other fields ...
}

func (c *ServiceConfig) InitDefaults() {
    // Existing code
    if c.CPUCostConfig.MaxConcurrentWeb <= 0 {
        c.CPUCostConfig.MaxConcurrentWeb = maxConcurrentWeb
    }
    // New defaults
    if c.CPUCostConfig.MaxConcurrentRoomComposite <= 0 {
        c.CPUCostConfig.MaxConcurrentRoomComposite = maxConcurrentRoomComposite
    }
    if c.CPUCostConfig.MaxConcurrentWebEgress <= 0 {
        c.CPUCostConfig.MaxConcurrentWebEgress = maxConcurrentWebEgress
    }
    if c.CPUCostConfig.MaxConcurrentParticipant <= 0 {
        c.CPUCostConfig.MaxConcurrentParticipant = maxConcurrentParticipant
    }
    if c.CPUCostConfig.MaxConcurrentTrackComposite <= 0 {
        c.CPUCostConfig.MaxConcurrentTrackComposite = maxConcurrentTrackComposite
    }
    if c.CPUCostConfig.MaxConcurrentTrack <= 0 {
        c.CPUCostConfig.MaxConcurrentTrack = maxConcurrentTrack
    }
    // ... other initializations ...
}

Update pkg/stats/monitor.go:Add per-type counters and check limits in CanAcceptRequest.

type Monitor struct {
    // Existing fields
    webRequests atomic.Int32
    // New per-type counters
    activeRoomCompositeCounts  atomic.Int32
    activeWebEgressCounts      atomic.Int32
    activeParticipantCounts    atomic.Int32
    activeTrackCompositeCounts atomic.Int32
    activeTrackCounts          atomic.Int32
    // ... other fields ...
}

func (m *Monitor) CanAcceptRequest(req *rpc.StartEgressRequest) bool {
    // Check per-type limits first
    switch req.Request.(type) {
    case *rpc.StartEgressRequest_RoomComposite:
        if m.activeRoomCompositeCounts.Load() >= m.cpuCostConfig.MaxConcurrentRoomComposite {
            logger.Debugw("room composite limit reached", "egressID", req.EgressId, "limit", m.cpuCostConfig.MaxConcurrentRoomComposite, "current", m.activeRoomCompositeCounts.Load())
            return false
        }
    case *rpc.StartEgressRequest_Web:
        if m.activeWebEgressCounts.Load() >= m.cpuCostConfig.MaxConcurrentWebEgress {
            logger.Debugw("web egress limit reached", "egressID", req.EgressId, "limit", m.cpuCostConfig.MaxConcurrentWebEgress, "current", m.activeWebEgressCounts.Load())
            return false
        }
    case *rpc.StartEgressRequest_Participant:
        if m.activeParticipantCounts.Load() >= m.cpuCostConfig.MaxConcurrentParticipant {
            logger.Debugw("participant limit reached", "egressID", req.EgressId, "limit", m.cpuCostConfig.MaxConcurrentParticipant, "current", m.activeParticipantCounts.Load())
            return false
        }
    case *rpc.StartEgressRequest_TrackComposite:
        if m.activeTrackCompositeCounts.Load() >= m.cpuCostConfig.MaxConcurrentTrackComposite {
            logger.Debugw("track composite limit reached", "egressID", req.EgressId, "limit", m.cpuCostConfig.MaxConcurrentTrackComposite, "current", m.activeTrackCompositeCounts.Load())
            return false
        }
    case *rpc.StartEgressRequest_Track:
        if m.activeTrackCounts.Load() >= m.cpuCostConfig.MaxConcurrentTrack {
            logger.Debugw("track limit reached", "egressID", req.EgressId, "limit", m.cpuCostConfig.MaxConcurrentTrack, "current", m.activeTrackCounts.Load())
            return false
        }
    }

    // Check CPU and global web limits
    m.mu.Lock()
    fields, canAccept := m.canAcceptRequestLocked(req)
    m.mu.Unlock()
    if !canAccept {
        logger.Debugw("cpu/web limit check failed", append(fields, "egressID", req.EgressId)...)
        return false
    }

    logger.Debugw("all checks passed", append(fields, "egressID", req.EgressId)...)
    return true
}

func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
    if !m.CanAcceptRequest(req) {
        return errors. ErrNotEnoughCPU
    }
    // ... increment appropriate counters (e.g., m.activeParticipantCounts.Inc()) ...
    // Existing logic for CPU hold, pending map, etc.
}

func (m *Monitor) EgressAborted(req *rpc.StartEgressRequest) {
    // ... decrement appropriate counters (e.g., m.activeParticipantCounts.Dec()) ...
}

func (m *Monitor) EgressEnded(req *rpc.StartEgressRequest) (float64, float64, int) {
    // ... decrement appropriate counters and update Prometheus gauges ...
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions