Skip to content

feat: refactor ConnectedComponents API#803

Open
SemyonSinchenko wants to merge 28 commits intographframes:mainfrom
SemyonSinchenko:775-refactor-cc-api
Open

feat: refactor ConnectedComponents API#803
SemyonSinchenko wants to merge 28 commits intographframes:mainfrom
SemyonSinchenko:775-refactor-cc-api

Conversation

@SemyonSinchenko
Copy link
Collaborator

What changes were proposed in this pull request?

  • Refactor the API to simplify switching algorithms
  • Expose the RC
  • graphframes -> two_phase
  • A new attempt of adding AQE-friendly TP

Why are the changes needed?

Close #775
Close #759
Close #758

@SemyonSinchenko
Copy link
Collaborator Author

On my benchmarks (wiki-Talks, 2M/5M v/e):

  • TP + skewedJoin =~ 130 seconds
  • TP + AQE =~ 20 seconds
  • RC =~ 30 seconds

@SemyonSinchenko
Copy link
Collaborator Author

@james-willis it is very important to review this one carefully :) I tried to keep the existing behavior as is and did not change any default behavior.

cc: @sonalgoyal you can be interested in skipping prepare
cc: @greg-kennedy on the local small-sized benchmark, the TP outperform RC. Not sure, was it my implementation problem, or the TP is just better for Apache Spark model.

@russelljurney-upside
Copy link
Contributor

@claude review please

@russelljurney-upside
Copy link
Contributor

@cursor review please, find bugs

@russelljurney-upside
Copy link
Contributor

Via @claude:

Overview

This PR refactors the Connected Components implementation by:

  1. Extracting the two-phase algorithm into a new TwoPhase.scala object
  2. Adding an AQE-friendly code path (runAQE) that avoids disabling AQE globally
  3. Renaming the "graphframes" algorithm to "two_phase" (with deprecation alias)
  4. Exposing isGraphPrepared to let advanced users skip internal graph preparation
  5. Adding checkpointing support to RandomizedContraction

Critical Issues

  1. Global AQE config mutation is not thread-safe (TwoPhase.scala:189-192)

run() reads spark.sql.adaptive.enabled, sets it to "false", then restores in finally. Concurrent callers on the same SparkSession will race on this config. The runAQE path avoids this — good — but run() is
still the default for broadcastThreshold != -1.

  1. Heavy code duplication between run and runAQE

These two methods are ~80-90% identical (~120 lines each). Differences are only join strategy (skewed vs. plain) and checkpoint strategy. Any bug fix must be applied in two places. Should extract shared
iteration logic.

Medium Issues

  1. Convergence via sum comparison is theoretically fragile (TwoPhase.scala:293, 413)

currSum == prevSum on sum(MIN_NBR) cast to DecimalType(38,0) could theoretically produce false convergence if two different label assignments produce the same sum. Extremely unlikely in practice but worth
documenting.

  1. isGraphPrepared silently ignored for GraphX (ConnectedComponents.scala:140-144)

If a user sets isGraphPrepared = true and uses ALGO_GRAPHX, the flag has no effect and no warning is issued. Should warn or throw.

  1. setIsGraphPrepared warns even when setting to false (ConnectedComponents.scala:127-133)

The warning should be conditional on value == true.

  1. Benchmark still uses deprecated "graphframes" name

ConnectedComponentsBenchmark.scala:23 has @param(Array("graphframes", "graphx")) — should use "two_phase" and consider adding "randomized_contraction".

  1. Missing test coverage
  • isGraphPrepared = true is never tested anywhere
  • randomized_contraction is never tested through the ConnectedComponents routing logic (only via direct RandomizedContraction.run())
  • No test for deprecated "graphframes" → "two_phase" normalization
  • No LDBC integration test for AQE mode (broadcastThreshold = -1)

Low Issues

  1. Naming conventions
  • persisted_df (TwoPhase.scala:299, 419) uses snake_case — should be persistedDf
  • ee/vv are very terse for 100+ line methods
  • minNbrs1/minNbrs2 are opaque
  1. System.gc() call (TwoPhase.scala:279)

Explicit GC is a JVM anti-pattern — it's only a hint and can cause stop-the-world pauses on the driver.

  1. ALGO_GRAPHFRAMES companion object val lacks @deprecated annotation (ConnectedComponents.scala:189)

Only has a Scaladoc tag, which doesn't produce compiler warnings at call sites.

  1. Documentation typos
  • is_direted → is_directed (05-traversals.md:66)
  • cluste → cluster (line 371)
  1. Persist after localCheckpoint may be redundant (TwoPhase.scala:268+283)

localCheckpoint(eager=true) already materializes data; subsequent .persist(intermediateStorageLevel) on the same DataFrame applies a different storage level to already-checkpointed data.

  1. Checkpoint cleanup is incomplete

Only the previous interval's checkpoint is cleaned up — the final checkpoint is never removed.

@SemyonSinchenko
Copy link
Collaborator Author

1. Global AQE config mutation is not thread-safe (TwoPhase.scala:189-192)

run() reads spark.sql.adaptive.enabled, sets it to "false", then restores in finally. Concurrent callers on the same SparkSession will race on this config. The runAQE path avoids this — good — but run() is still the default for broadcastThreshold != -1.

That is an existing code. Problem is out of the scope.

2. Heavy code duplication between run and runAQE

These two methods are ~80-90% identical (~120 lines each). Differences are only join strategy (skewed vs. plain) and checkpoint strategy. Any bug fix must be applied in two places. Should extract shared iteration logic.

That was intentional. The goal is guarantee the existing behavior. Last time we were forced to do rollback.

3. Convergence via sum comparison is theoretically fragile (TwoPhase.scala:293, 413)

currSum == prevSum on sum(MIN_NBR) cast to DecimalType(38,0) could theoretically produce false convergence if two different label assignments produce the same sum. Extremely unlikely in practice but worth documenting.

That is an existing code. Problem is out of the scope.

4. isGraphPrepared silently ignored for GraphX (ConnectedComponents.scala:140-144)

If a user sets isGraphPrepared = true and uses ALGO_GRAPHX, the flag has no effect and no warning is issued. Should warn or throw.

That is intentional. Anyone who use isGraphPrepared knows what are they doing.

5. setIsGraphPrepared warns even when setting to false (ConnectedComponents.scala:127-133)

The warning should be conditional on value == true.

I see zero reasons to do it. The comment lacks the context of the method.

6. Benchmark still uses deprecated "graphframes" name

ConnectedComponentsBenchmark.scala:23 has @param(Array("graphframes", "graphx")) — should use "two_phase" and consider adding "randomized_contraction".

That is an existing code. Problem is out of the scope.

* isGraphPrepared = true is never tested anywhere

I see zero reasons to do it. The comment lacks the context of the method.

* randomized_contraction is never tested through the ConnectedComponents routing logic (only via direct RandomizedContraction.run())

* No test for deprecated "graphframes" → "two_phase" normalization

* No LDBC integration test for AQE mode (broadcastThreshold = -1)

Tests coverage is fine.

8. Naming conventions


* persisted_df (TwoPhase.scala:299, 419) uses snake_case — should be persistedDf

* ee/vv are very terse for 100+ line methods

* minNbrs1/minNbrs2 are opaque

That is the only reasonable comment, I will update the persisted_df -> persistedDf

9. System.gc() call (TwoPhase.scala:279)

Explicit GC is a JVM anti-pattern — it's only a hint and can cause stop-the-world pauses on the driver.

Too generic. System.gc() is a standard in the Spark World.

10. ALGO_GRAPHFRAMES companion object val lacks @deprecated annotation (ConnectedComponents.scala:189)

Only has a Scaladoc tag, which doesn't produce compiler warnings at call sites.

scalafix says otherwise (there was a warning "use scala annotations instead of java annotations")

11. Documentation typos


* is_direted → is_directed (05-traversals.md:66)

* cluste → cluster (line 371)

Out of scope.

12. Persist after localCheckpoint may be redundant (TwoPhase.scala:268+283)

localCheckpoint(eager=true) already materializes data; subsequent .persist(intermediateStorageLevel) on the same DataFrame applies a different storage level to already-checkpointed data.

That is fine, just check the logic.

13. Checkpoint cleanup is incomplete

Only the previous interval's checkpoint is cleaned up — the final checkpoint is never removed.

Not 100% what is it about

@greg-kennedy
Copy link

  1. Checkpoint cleanup is incomplete
    Only the previous interval's checkpoint is cleaned up — the final checkpoint is never removed.

This is a difficult pattern in Spark in all cases... if you unpersist the final checkpoint before returning, then it's been deleted before the caller can make use of it :)

@russelljurney-upside
Copy link
Contributor

Sorry if Claude wasn't helpful, I'll direct it to only review changes native to the PR next time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

3 participants