feat(pregel): automatically skip second join when dst columns not needed#795
feat(pregel): automatically skip second join when dst columns not needed#795james-willis wants to merge 11 commits intographframes:mainfrom
Conversation
core/src/main/scala-spark-3/org/apache/spark/sql/graphframes/SparkShims.scala
Outdated
Show resolved
Hide resolved
|
sharing comments from SEM to get them to my personal machine:
|
| .select(col(SRC).alias("edge_src"), col(DST).alias("edge_dst"), struct(col("*")).as(EDGE)) | ||
| .repartition(col("edge_src"), col("edge_dst")) | ||
| .repartition( | ||
| (if (needsDstState) Seq(col("edge_src"), col("edge_dst")) else Seq(col("edge_src"))): _*) |
There was a problem hiding this comment.
We do need repartition by both in both branches. It was a mistake to add it: it makes shuffle required on both branches (join by src and join by dst). Let's simplify and optimize by always do partitioning by the src only.
| .drop(col("edge_src"), col("edge_dst")) | ||
|
|
||
| // Optimization: persist srcWithEdges when skipping dst join to avoid recomputation | ||
| if (!needsDstState) { |
There was a problem hiding this comment.
I don't understand why do we need persist here and I'm against it. We already have persisted edges. Adding persisted triplets will blow the memory without any benefits.
| .drop(col("edge_src"), col("edge_dst")) | ||
| } | ||
|
|
||
| if (skipMessagesFromNonActiveVertices) { |
There was a problem hiding this comment.
If we do not need the second join, this condition can be moved up and be called before the first join that will be a big benefit for some algorithms. Can we do it?
Implements automatic optimization for Pregel triplet generation that skips the second join (adding destination vertex state) when no message expressions reference dst.* columns. The optimization works by: 1. Analyzing all message expressions before the iteration loop 2. Extracting column prefixes (src, dst, edge) from the expression AST 3. Skipping the dst vertex join if no dst.* columns are referenced AND skipMessagesFromNonActiveVertices is disabled This provides significant performance improvement for algorithms like PageRank, directed LabelPropagation, and DetectingCycles that only need source vertex or edge columns in their message expressions. Closes graphframes#790
…id when skipping join The previous implementation incorrectly checked both the target ID expression and message expression for dst.* references. Since sendMsgToDst uses Pregel.dst(ID) as the target, it would always detect 'dst' as referenced even when the message itself only used src columns. This fix: 1. Only analyzes the message expressions (not target ID) for dst.* references 2. When skipping the join, creates a minimal dst struct with just the id from edge_dst so that sendMsgToDst can still route messages correctly Added test: 'sendMsgToDst with only src columns in message' to verify the optimization works correctly when dst.id is implicitly used for routing.
- Add extractColumnReferences to SparkShims returning Map[String, Set[String]] to track which specific fields are accessed under each prefix - Handle resolved expressions (AttributeReference, GetStructField) in addition to unresolved ones for more robust column detection - Update Pregel optimization to skip dst join when only dst.id is referenced since dst.id is available from the edge's dst column - Change optimization log message from logInfo to logDebug - Add test for dst.id-only reference case
- Remove unused extractColumnPrefixes method from SparkShims (both Spark 3/4) - Refactor Pregel.scala to parse expressions once instead of twice - Add documentation for deeply nested struct access fallback behavior
…n optimization - Add SparkShimsSuite with 22 unit tests for column reference extraction - Add 4 integration tests to PregelSuite for complex dst usage patterns - Fix UTF8String handling in UnresolvedExtractValue pattern matching
Address peer feedback to keep SparkShims implementation-agnostic by removing specific algorithm references from comments while maintaining functional clarity. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Implement peer feedback suggestions: 1. Move cache/checkpoint logic before expensive operations - persist srcWithEdges when skipping dst join to avoid recomputation 2. Change partitioning to src-only when dst join is skipped since dst partitioning is unnecessary 3. Move dst state detection earlier to enable these optimizations These changes provide additional performance improvements for algorithms like PageRank that only need source vertex data. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Fix Scala syntax error in repartition call that was causing CI build failures. Use proper sequence expansion syntax for multiple column repartitioning. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Fix Scala syntax error in repartition call that was causing CI build failures. Use proper sequence expansion syntax for multiple column repartitioning. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Fix formatting issues that were causing CI scalafmt checks to fail. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
be54ce9 to
1fc9b3c
Compare
Summary
Implements automatic optimization for Pregel triplet generation that skips the second join (adding destination vertex state) when no message expressions reference
dst.*columns.Closes #790
Changes
extractColumnPrefixeshelper method toSparkShims(both Spark 3 and Spark 4 versions) to analyze Column expressions and extract column name prefixesPregel.run()to detect if destination vertex state is needed by analyzing all message expressionsdst.*columns are not referenced ANDskipMessagesFromNonActiveVerticesis disabled, the second join is automatically skippedHow it works
sendMsgToSrc/sendMsgToDst) to collect which column prefixes are referenceddst.*columns, skip the second join entirelyAlgorithms that benefit
Pregel.src("rank")andPregel.src("outDegree")Pregel.src(LABEL_ID)Pregel.src(storedSeqCol)andPregel.dst(GraphFrame.ID), but ID is available from the join keyTesting
All 227 core tests pass, including 5 new tests specifically for this optimization:
automatic dst join skipping - PageRank only uses src columnsautomatic dst join NOT skipped when dst columns are referencedautomatic dst join NOT skipped when skipMessagesFromNonActiveVertices is enabledautomatic dst join skipping - sendMsgToSrc with only edge columnsautomatic dst join skipping - edge columns only