Skip to content

feat(pregel): automatically skip second join when dst columns not needed#795

Open
james-willis wants to merge 11 commits intographframes:mainfrom
james-willis:feat/auto-skip-dst-join-790
Open

feat(pregel): automatically skip second join when dst columns not needed#795
james-willis wants to merge 11 commits intographframes:mainfrom
james-willis:feat/auto-skip-dst-join-790

Conversation

@james-willis
Copy link
Collaborator

Summary

Implements automatic optimization for Pregel triplet generation that skips the second join (adding destination vertex state) when no message expressions reference dst.* columns.

Closes #790

Changes

  • Added extractColumnPrefixes helper method to SparkShims (both Spark 3 and Spark 4 versions) to analyze Column expressions and extract column name prefixes
  • Modified Pregel.run() to detect if destination vertex state is needed by analyzing all message expressions
  • When dst.* columns are not referenced AND skipMessagesFromNonActiveVertices is disabled, the second join is automatically skipped
  • Added 5 comprehensive tests for the optimization

How it works

  1. Detection phase (before iteration loop): Analyze all message expressions (sendMsgToSrc/sendMsgToDst) to collect which column prefixes are referenced
  2. Conditional join: If no expressions reference dst.* columns, skip the second join entirely
val needsDstState = allReferencedPrefixes.contains(DST) || skipMessagesFromNonActiveVertices

Algorithms that benefit

  • PageRank: Only uses Pregel.src("rank") and Pregel.src("outDegree")
  • LabelPropagation (directed mode): Only uses Pregel.src(LABEL_ID)
  • DetectingCycles: Uses Pregel.src(storedSeqCol) and Pregel.dst(GraphFrame.ID), but ID is available from the join key

Testing

All 227 core tests pass, including 5 new tests specifically for this optimization:

  • automatic dst join skipping - PageRank only uses src columns
  • automatic dst join NOT skipped when dst columns are referenced
  • automatic dst join NOT skipped when skipMessagesFromNonActiveVertices is enabled
  • automatic dst join skipping - sendMsgToSrc with only edge columns
  • automatic dst join skipping - edge columns only

@james-willis
Copy link
Collaborator Author

sharing comments from SEM to get them to my personal machine:

try to move this block (https://github.com/graphframes/graphframes/blob/main/core/src/main/scala/org/graphframes/lib/Pregel.scala#L442-L445) before join in case we are skipping dst
change partitioning here (https://github.com/graphframes/graphframes/blob/main/core/src/main/scala/org/graphframes/lib/Pregel.scala#L402) to src only

@james-willis james-willis marked this pull request as ready for review February 28, 2026 21:54
.select(col(SRC).alias("edge_src"), col(DST).alias("edge_dst"), struct(col("*")).as(EDGE))
.repartition(col("edge_src"), col("edge_dst"))
.repartition(
(if (needsDstState) Seq(col("edge_src"), col("edge_dst")) else Seq(col("edge_src"))): _*)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need repartition by both in both branches. It was a mistake to add it: it makes shuffle required on both branches (join by src and join by dst). Let's simplify and optimize by always do partitioning by the src only.

.drop(col("edge_src"), col("edge_dst"))

// Optimization: persist srcWithEdges when skipping dst join to avoid recomputation
if (!needsDstState) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why do we need persist here and I'm against it. We already have persisted edges. Adding persisted triplets will blow the memory without any benefits.

.drop(col("edge_src"), col("edge_dst"))
}

if (skipMessagesFromNonActiveVertices) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do not need the second join, this condition can be moved up and be called before the first join that will be a big benefit for some algorithms. Can we do it?

james-willis and others added 11 commits March 4, 2026 11:43
Implements automatic optimization for Pregel triplet generation that
skips the second join (adding destination vertex state) when no message
expressions reference dst.* columns.

The optimization works by:
1. Analyzing all message expressions before the iteration loop
2. Extracting column prefixes (src, dst, edge) from the expression AST
3. Skipping the dst vertex join if no dst.* columns are referenced
   AND skipMessagesFromNonActiveVertices is disabled

This provides significant performance improvement for algorithms like
PageRank, directed LabelPropagation, and DetectingCycles that only
need source vertex or edge columns in their message expressions.

Closes graphframes#790
…id when skipping join

The previous implementation incorrectly checked both the target ID expression
and message expression for dst.* references. Since sendMsgToDst uses
Pregel.dst(ID) as the target, it would always detect 'dst' as referenced
even when the message itself only used src columns.

This fix:
1. Only analyzes the message expressions (not target ID) for dst.* references
2. When skipping the join, creates a minimal dst struct with just the id
   from edge_dst so that sendMsgToDst can still route messages correctly

Added test: 'sendMsgToDst with only src columns in message' to verify
the optimization works correctly when dst.id is implicitly used for routing.
- Add extractColumnReferences to SparkShims returning Map[String, Set[String]]
  to track which specific fields are accessed under each prefix
- Handle resolved expressions (AttributeReference, GetStructField) in addition
  to unresolved ones for more robust column detection
- Update Pregel optimization to skip dst join when only dst.id is referenced
  since dst.id is available from the edge's dst column
- Change optimization log message from logInfo to logDebug
- Add test for dst.id-only reference case
- Remove unused extractColumnPrefixes method from SparkShims (both Spark 3/4)
- Refactor Pregel.scala to parse expressions once instead of twice
- Add documentation for deeply nested struct access fallback behavior
…n optimization

- Add SparkShimsSuite with 22 unit tests for column reference extraction
- Add 4 integration tests to PregelSuite for complex dst usage patterns
- Fix UTF8String handling in UnresolvedExtractValue pattern matching
Address peer feedback to keep SparkShims implementation-agnostic by removing
specific algorithm references from comments while maintaining functional clarity.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Implement peer feedback suggestions:
1. Move cache/checkpoint logic before expensive operations - persist srcWithEdges
   when skipping dst join to avoid recomputation
2. Change partitioning to src-only when dst join is skipped since dst
   partitioning is unnecessary
3. Move dst state detection earlier to enable these optimizations

These changes provide additional performance improvements for algorithms like
PageRank that only need source vertex data.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Fix Scala syntax error in repartition call that was causing CI build failures.
Use proper sequence expansion syntax for multiple column repartitioning.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Fix Scala syntax error in repartition call that was causing CI build failures.
Use proper sequence expansion syntax for multiple column repartitioning.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Fix formatting issues that were causing CI scalafmt checks to fail.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
@james-willis james-willis force-pushed the feat/auto-skip-dst-join-790 branch from be54ce9 to 1fc9b3c Compare March 4, 2026 19:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: add a way to skip second join for triplets generation in Pregel

2 participants