Decision is NOT based on row count → based on table size in bytes + memory availability.
| Condition | Best Join Type |
|---|---|
| Smaller table < 8GB + memory available | Broadcast Hash Join |
| Broadcast fails or >8GB | Shuffle Sort-Merge Join |
| Key duplication is high | Prefer Sort-Merge over Hash |
| Data skew exists | Salt keys, repartition, AQE |
Example:
df = big_df.join(broadcast(small_df),"id")If can't broadcast:
df = big_df.repartition("id").join(small_df,"id")When duplicate keys exist, broadcast hash join struggles due to large hash bucket expansion.
Best approach:
- Use Sort-Merge Join, as sorting groups duplicates efficiently.
- If skew too high → apply salting.
- If only few keys are hot → isolate heavy keys.
Example salting:
df = df.withColumn("salt", rand()*10)
df.join(other.withColumn("salt",rand()*10),["id","salt"])Use Sort-Merge Join with:
• repartition on join key • filter pushdown before join • drop unused columns before join
Example:
df1 = df1.select("id","col1")
df2 = df2.select("id","col2")
df = df1.repartition("id").join(df2,"id")Fixes:
- Salting
- Split heavy key + union
- Enable AQE dynamic skew optimization
- Broadcast smaller table if possible
Example heavy key isolation:
df_hot = big_df.filter("id='USA'")
df_rest = big_df.filter("id!='USA'")
final = df_hot.join(ref,"id").union(df_rest.join(ref,"id"))Default threshold:
spark.sql.autoBroadcastJoinThreshold = 8GB
But broadcast fails if:
• executor memory < broadcast size • many cached DFs already using RAM • wide rows → actual size > compressed file size
You always check:
df_small._jdf.queryExecution().optimizedPlan().stats().sizeInBytes()Use this quick decision table:
| Condition | Join Type |
|---|---|
| Small (<8GB) vs Huge | Broadcast Hash Join |
| Large vs Large | Sort-Merge Join |
| Key duplication high | Sort-Merge Join |
| Key skew | Salting / Split Logic |
| Memory low | SMJ or Disk-persist hash |
| Real-time + small dims | Broadcast lookup cache |
Memorize this — interview safe.
Because duplicated keys = large hash buckets = memory spikes. Sort-Merge groups identical keys contiguously → efficient nested join.
Option → partial broadcast / selective join.
Filter dimension before broadcast:
filtered = dim.filter("category='electronics'")
df.join(broadcast(filtered),"id")Join only required fields:
Wrong:
df_large.join(dim,"id")Correct:
dim = dim.select("id","category") # cut width
df=df_large.join(dim,"id")Column width reduction improves serialization + shuffle cost.
Not compute issue → metadata + small files + fragmentation.
Fix:
OPTIMIZE table ZORDER BY (id)
VACUUM 7
Use SCD Type-2 + Delta Lake Merge, broadcast dimension if small.
| Full Join | Incremental Join |
|---|---|
| scans entire 2B rows | scans only last partition |
| heavy | scalable |
| good for rare updates | necessary for daily loads |
AQE enables:
- dynamic broadcast
- shuffle coalescing
- skew handling auto-split
Enable:
spark.sql.adaptive.enabled=true
Repartition — increases partitions → parallelism (shuffle) Coalesce — reduces partitions → fewer files
Joining big tables → repartition by key
df.repartition(300,"id")Cause → collect(), toPandas(), large broadcast.
Fix:
• never collect big DF • broadcast only small • move UDF → SQL expressions
Caching eliminates repeated shuffle for lookup-heavy joins.
But only cache if reused:
dim.cache(); dim.count()When joins happen repeatedly on same key — bucketed tables avoid shuffle.
Example:
CREATE TABLE fact CLUSTERED BY (id) INTO 256 BUCKETS;| Join | Memory | Shuffle | Best Use Case |
|---|---|---|---|
| Broadcast Hash | High | Very Low | Small dim joins |
| Sort-Merge | Medium | High | Large-large joins |
| Shuffle Hash | High | High | Rarely chosen manually |
Use:
df.explain("formatted")
Look for: • Broadcast hash join • Sort merge join • Shuffle hash
| Situation | Correct Join |
|---|---|
| 5M vs 2B — fits in memory | Broadcast |
| 5M vs 2B — repeated keys | Sort-Merge |
| Both >50M | Sort-Merge |
| Both skewed | Salt or split |
| Dimension reused repeatedly | Cache + Broadcast |
| Massive history table | ZORDER + VACUUM + MERGE incremental |
Use Shuffle-Hash when:
| Condition | Result |
|---|---|
| Both datasets large | Broadcast not possible |
| Sorting cost high | SMJ slower |
| Keys evenly distributed | Memory buckets efficient |
df1.join(df2.hint("SHUFFLE_HASH"), "id")If key duplication or skew exists → Hash join creates huge hash buckets → OOM.
| Example Problem |
|---|
| 60% of rows have same key → bucket imbalance |
In this case Sort-Merge Join is safer.
df1.repartition("id").join(df2,"id")Sorting both → expensive Hash join without sorting → cheaper
| Join Type | Result |
|---|---|
| Broadcast | No (too large) |
| Sort-Merge | Works but slow |
| Shuffle-Hash | Best if keys uniform |
df1.join(df2.hint("SHUFFLE_HASH"),"product_id")Earlier → Broadcast was ideal Now > threshold → choose Sort-Merge Join
big.repartition("cust_id").join(dim,"cust_id")Broadcast = no longer viable.
E-commerce event log join:
| Table | Size | Distribution |
|---|---|---|
| logs | 25GB | uniform |
| clicks | 18GB | uniform |
Sorting 43GB = slow → Shuffle-Hash faster by ~35%
df_logs.join(clicks.hint("SHUFFLE_HASH"),"session_id")| id | name | segment |
|---|---|---|
| 101 | John | Gold |
| 102 | Tara | Bronze |
| id | amount |
|---|---|
| 101 | 1400 |
| 102 | 900 |
Why Broadcast? Small → fits memory → avoids shuffling 2.4B rows.
sales.join(broadcast(customers),"id")| user_id | page |
|---|---|
| 1001 | /home |
| 1001 | /cart |
| user_id | plan |
|---|---|
| 1001 | Gold |
| 1001 | Pro |
Broadcast + Hash = risky Sort-Merge handles duplication better.
click_logs.repartition("user_id").join(user_profile,"user_id")| month | rows |
|---|---|
| Jan | 250M |
| Feb | 240M |
customer_dim = 3.5M rows
sales_2025.filter("month='Feb'") \
.join(broadcast(customer_dim),"cust_id")Only 1 partition scanned → best for Broadcast Join.
orders partitioned by country
join required on user_id
| Issue |
|---|
| Partition key ≠ join key → shuffle inevitable |
Use SMJ + possible salting for skew:
orders.repartition("user_id").join(users,"user_id")Both tables huge + equal spread keys:
| Condition | Pick |
|---|---|
| No broadcast | ✔ |
| SMJ too expensive (sorting TB) | ✔ |
| Keys uniform | ✔ Shuffle-Hash wins |
df1.join(df2.hint("SHUFFLE_HASH"),"id")| txn_id | cust_id | amount | date |
|---|---|---|---|
| 5001 | C101 | 900 | 2025-05-01 |
| 5002 | C101 | 300 | 2025-05-01 |
| 5003 | C102 | 1200 | 2025-05-01 |
| 5004 | C103 | 800 | 2025-05-02 |
| cust_id | name | city |
|---|---|---|
| C101 | John | Delhi |
| C102 | Aditi | Mumbai |
| C103 | Ryan | NYC |
Scenario 1 → Only date='2025-05-01' required → only 33% partitions scanned → Broadcast is ideal
fact.filter("date='2025-05-01'") \
.join(broadcast(dim),"cust_id")Scenario 2 → cust_id highly duplicated (C101 = 70% rows)
→ Broadcast may work BUT risk of bucket-explosion in hash
→ Better = Sort-Merge Join + Salting
from pyspark.sql.functions import rand
fact = fact.withColumn("cid_salt", fact.cust_id + (rand()*10).cast("int"))
dim = dim.withColumn("cid_salt", dim.cust_id)
result = fact.join(dim,"cid_salt")Now you can answer ANY join-based interview question confidently.
Is one table small (< 8GB)?
│
┌──────────────┴──────────────┐
│ │
YES NO
│ │
BROADCAST HASH JOIN Are tables very large?
(Fastest, no shuffle) │
│
┌─────────────┴─────────────┐
│ │
YES NO
│ │
Keys uniform + enough memory? Skew/Duplicates present?
│ │
┌────────┴────────┐ ┌──────┴───────┐
│ │ │ │
YES → SHUFFLE HASH NO → SORT-MERGE JOIN SALTING + SMJ
(Avoid sorting cost) (Stable for heavy dup)