[CELEBORN-1577][BUG] Quota cancel shuffle should use app shuffle id#3662
[CELEBORN-1577][BUG] Quota cancel shuffle should use app shuffle id#3662s0nskar wants to merge 5 commits intoapache:mainfrom
Conversation
|
cc: @leixm PTAL |
There was a problem hiding this comment.
Pull request overview
This PR updates quota-triggered shuffle/stage cancellation to use Spark’s app shuffle ID (the one understood by DAGScheduler) by tracking a mapping from Celeborn-generated shuffle IDs to app shuffle IDs.
Changes:
- Introduced a
celebornShuffleId -> appShuffleIdmapping inLifecycleManager. - Populated the mapping when generating new Celeborn shuffle IDs.
- Updated
cancelAllActiveStagesto translate active Celeborn shuffle IDs to app shuffle IDs before invoking the Spark cancel callback.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| .filter(celebornShuffleIdToAppShuffleIdMap.contains(_)) | ||
| .map(celebornShuffleIdToAppShuffleIdMap.get(_)) |
There was a problem hiding this comment.
ConcurrentHashMap.contains(...) checks for a value (deprecated alias of containsValue), not a key. Here we need to filter by whether the Celeborn shuffleId exists as a key in celebornShuffleIdToAppShuffleIdMap; otherwise quota cancellation will frequently skip active shuffles. Use containsKey (or a single get + null/Option check) before mapping to the appShuffleId.
| .filter(celebornShuffleIdToAppShuffleIdMap.contains(_)) | |
| .map(celebornShuffleIdToAppShuffleIdMap.get(_)) | |
| .flatMap(shuffleId => Option(celebornShuffleIdToAppShuffleIdMap.get(shuffleId))) |
| private val shuffleIdMapping = JavaUtils.newConcurrentHashMap[ | ||
| Int, | ||
| scala.collection.mutable.LinkedHashMap[String, (Int, Boolean)]]() | ||
| private val celebornShuffleIdToAppShuffleIdMap = JavaUtils.newConcurrentHashMap[Int, Int]() | ||
| private val shuffleIdGenerator = new AtomicInteger(0) |
There was a problem hiding this comment.
celebornShuffleIdToAppShuffleIdMap is only ever added to, and it is not cleared when shuffles expire/unregister (e.g., removeExpiredShuffle removes shuffleAllocatedWorkers, latestPartitionLocation, etc. but not this map). In long-running drivers this can grow without bound; consider removing the mapping when a shuffle is expired/removed (and/or when unregistering app shuffles).
|
@s0nskar, you'd better to firstly resolve the following failure of compilation and address the comments of coiplot. |
|
@SteNicholas working on it! |
What changes were proposed in this pull request?
Why are the changes needed?
shuffleAllocatedWorkersworker contains celebornShuffleId, we need to useappShuffleIdbecause DAGScheduler only understand app shuffle id.Does this PR resolve a correctness bug?
No
Does this PR introduce any user-facing change?
No
How was this patch tested?
NA