Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Under what rules will trino's left join be converted into inner join? #23357

Open
chengwei977 opened this issue Sep 11, 2024 · 1 comment
Open

Comments

@chengwei977
Copy link

My trino version is 442. I get incorrect data when executing the following SQL.

SQL1:

WITH temp_1 AS (
SELECT
ym,
cydymc,
kplx,
budat,
matnr,
sum(abs(fkimg)) AS fkimg,
sum(abs(srjh)) AS srjh,
CASE
WHEN sum(abs(fkimg))= 0 THEN 0
ELSE sum(abs(srjh))/ sum(abs(fkimg))
END AS dj
FROM
ice.dwd.dwd_zb_tzzxfx_cpjg
GROUP BY
ym,
cydymc,
kplx,
budat,
matnr
),
temp_2 AS (
SELECT
DISTINCT date_sap AS date_sap
FROM
ice.dim.dim_date
WHERE
date_sap >= '20180101'
AND date_sap <= date_format(current_date,
'%Y%m%d')
),
temp_3 AS (
SELECT
DISTINCT ym
FROM
ice.dwd.dwd_zb_tzzxfx_cpjg
),
temp_4 AS (
SELECT
a.date_sap,
b.ym
FROM
temp_2 a
JOIN temp_3 b
ON
date_diff( 'month',
date_parse(substring(date_sap, 1, 6),
'%Y%m'),
date_parse(ym,
'%Y%m'))<24
AND date_diff( 'month',
date_parse(substring(date_sap, 1, 6),
'%Y%m'),
date_parse(ym,
'%Y%m'))>= 0
)
,
temp_5 AS (
SELECT
a.date_sap,
b.ym,
b.budat,
b.cydymc,
b.kplx,
b.budat,
b.matnr
FROM
temp_4 a
LEFT JOIN temp_1 b
ON
a.ym = b.ym
AND a.date_sap = b.budat
)
SELECT
ym,
date_sap,
count(*)
FROM
temp_5
WHERE
ym = '202408'
GROUP BY
ym,
date_sap;

The expected number of result rows should be 731, but the actual result is 462.

=====================================================

When I execute the temp_4 table alone, the result is 731. The SQL is as follows:

SQL2:

with temp_2 AS (
SELECT
DISTINCT date_sap AS date_sap
FROM
ice.dim.dim_date
WHERE
date_sap >= '20180101'
AND date_sap <= date_format(current_date,
'%Y%m%d')
),
temp_3 AS (
SELECT
DISTINCT ym
FROM
ice.dwd.dwd_zb_tzzxfx_cpjg
),
temp_4 AS (
SELECT
a.date_sap,
b.ym
FROM
temp_2 a
JOIN temp_3 b
ON
date_diff( 'month',
date_parse(substring(date_sap, 1, 6),
'%Y%m'),
date_parse(ym,
'%Y%m'))<24
AND date_diff( 'month',
date_parse(substring(date_sap, 1, 6),
'%Y%m'),
date_parse(ym,
'%Y%m'))>= 0
) Select * from temp_4 where ym = '202408';

====================================

This is the execution plan of SQL1. I found that in the definition of the temp_5 temporary table, the join part changed from left join to inner join.

Trino version: 442
Fragment 0 [HASH]
Output layout: [ym_2, date_sap, count]
Output partitioning: SINGLE []
Output[columnNames = [ym, date_sap, _col2]]
│ Layout: [ym_2:varchar, date_sap:varchar, count:bigint]
│ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
│ ym := ym_2
│ _col2 := count
└─ Aggregate[keys = [ym_2, date_sap]]
│ Layout: [ym_2:varchar, date_sap:varchar, count:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
│ count := count(*)
└─ InnerJoin[criteria = (date_sap = budat_8), distribution = PARTITIONED]
│ Layout: [date_sap:varchar, ym_2:varchar]
│ Estimates: {rows: ? (?), cpu: ?, memory: 165.20kB, network: 0B}
│ Distribution: PARTITIONED
│ dynamicFilterAssignments = {budat_8 -> #df_1174}
├─ FilterProject[filterPredicate = ((date_diff('month', date_parse(substring(date_sap, bigint '1', bigint '6'), '%Y%m'), date_parse(ym, '%Y%m')) < bigint '24') AND (date_diff('month', date_parse(substring(date_sap, bigint '1', bigint '6'), '%Y%m'), date_parse(ym, '%Y%m')) >= bigint '0'))]
│ │ Layout: [date_sap:varchar]
│ │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
│ └─ CrossJoin[]
│ │ Layout: [date_sap:varchar, ym:varchar]
│ │ Estimates: {rows: ? (?), cpu: ?, memory: 10B, network: 0B}
│ │ Distribution: REPLICATED
│ ├─ Aggregate[type = FINAL, keys = [date_sap]]
│ │ │ Layout: [date_sap:varchar]
│ │ │ Estimates: {rows: ? (?), cpu: 127.29k, memory: ?, network: 0B}
│ │ └─ LocalExchange[partitioning = HASH, arguments = [date_sap]]
│ │ │ Layout: [date_sap:varchar]
│ │ │ Estimates: {rows: 10851 (127.29kB), cpu: 127.29k, memory: 0B, network: 0B}
│ │ └─ RemoteSource[sourceFragmentIds = [1]]
│ │ Layout: [date_sap:varchar]
│ └─ LocalExchange[partitioning = SINGLE]
│ │ Layout: [ym:varchar]
│ │ Estimates: {rows: 1 (5B), cpu: 0, memory: 0B, network: 0B}
│ └─ RemoteSource[sourceFragmentIds = [2]]
│ Layout: [ym:varchar]
└─ LocalExchange[partitioning = HASH, arguments = [budat_8]]
│ Layout: [ym_2:varchar, budat_8:varchar]
│ Estimates: {rows: 12509 (165.20kB), cpu: 165.20k, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [4]]
Layout: [ym_2:varchar, budat_8:varchar]

Fragment 1 [SOURCE]
Output layout: [date_sap]
Output partitioning: HASH [date_sap]
Aggregate[type = PARTIAL, keys = [date_sap]]
│ Layout: [date_sap:varchar]
│ Estimates: {rows: 10851 (127.29kB), cpu: ?, memory: ?, network: ?}
└─ ScanFilter[table = ice:dim.dim_date$data@6706129778701784257, filterPredicate = ((date_sap BETWEEN varchar '20180101' AND varchar '20240911') AND (date_diff('month', date_parse(substring(date_sap, bigint '1', bigint '6'), '%Y%m'), timestamp(3) '2024-08-01 00:00:00.000') < bigint '24') AND (date_diff('month', date_parse(substring(date_sap, bigint '1', bigint '6'), '%Y%m'), timestamp(3) '2024-08-01 00:00:00.000') >= bigint '0')), dynamicFilters = {date_sap = #df_1174}]
Layout: [date_sap:varchar]
Estimates: {rows: 12057 (141.43kB), cpu: 141.43k, memory: 0B, network: 0B}/{rows: 10851 (127.29kB), cpu: 141.43k, memory: 0B, network: 0B}
date_sap := 1:date_sap:varchar

Fragment 2 [HASH]
Output layout: [ym]
Output partitioning: BROADCAST []
Aggregate[type = FINAL, keys = [ym]]
│ Layout: [ym:varchar]
│ Estimates: {rows: 1 (5B), cpu: 70.11k, memory: 5B, network: 0B}
└─ LocalExchange[partitioning = HASH, arguments = [ym]]
│ Layout: [ym:varchar]
│ Estimates: {rows: 13899 (70.11kB), cpu: 70.11k, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [3]]
Layout: [ym:varchar]

Fragment 3 [SOURCE]
Output layout: [ym]
Output partitioning: HASH [ym]
Aggregate[type = PARTIAL, keys = [ym]]
│ Layout: [ym:varchar]
│ Estimates: {rows: 13899 (70.11kB), cpu: ?, memory: ?, network: ?}
└─ ScanFilter[table = ice:dwd.dwd_zb_tzzxfx_cpjg$data@4280907653966917687, filterPredicate = (ym = varchar '202408')]
Layout: [ym:varchar]
Estimates: {rows: 1403801 (6.92MB), cpu: 6.92M, memory: 0B, network: 0B}/{rows: 13899 (70.11kB), cpu: 6.92M, memory: 0B, network: 0B}
ym := 1:ym:varchar

Fragment 4 [HASH]
Output layout: [ym_2, budat_8]
Output partitioning: HASH [budat_8]
Project[]
│ Layout: [ym_2:varchar, budat_8:varchar]
│ Estimates: {rows: 12509 (165.20kB), cpu: 165.20k, memory: 0B, network: 0B}
└─ Aggregate[type = FINAL, keys = [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]]
│ Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar]
│ Estimates: {rows: 12509 (387.74kB), cpu: 387.74k, memory: 387.74kB, network: 0B}
└─ LocalExchange[partitioning = HASH, arguments = [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]]
│ Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar]
│ Estimates: {rows: 12509 (387.74kB), cpu: 387.74k, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [5]]
Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar]

Fragment 5 [SOURCE]
Output layout: [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]
Output partitioning: HASH [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]
Aggregate[type = PARTIAL, keys = [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]]
│ Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar]
│ Estimates: {rows: 12509 (387.74kB), cpu: ?, memory: ?, network: ?}
└─ ScanFilter[table = ice:dwd.dwd_zb_tzzxfx_cpjg$data@4280907653966917687, filterPredicate = ((budat_8 BETWEEN varchar '20180101' AND varchar '20240911') AND (ym_2 = varchar '202408') AND (date_diff('month', date_parse(substring(budat_8, bigint '1', bigint '6'), '%Y%m'), timestamp(3) '2024-08-01 00:00:00.000') < bigint '24') AND (date_diff('month', date_parse(substring(budat_8, bigint '1', bigint '6'), '%Y%m'), timestamp(3) '2024-08-01 00:00:00.000') >= bigint '0'))]
Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar]
Estimates: {rows: 1403801 (42.49MB), cpu: 42.49M, memory: 0B, network: 0B}/{rows: 12509 (387.74kB), cpu: 42.49M, memory: 0B, network: 0B}
cydymc_3 := 2:cydymc:varchar
budat_8 := 7:budat:varchar
kplx_7 := 6:kplx:varchar
ym_2 := 1:ym:varchar
matnr_13 := 12:matnr:varchar

===================================================================

If I change select a.date_sap, b.ym,xxx in temp_5 definition to select a.date_sap,a.ym,xxx, the execution plan is restored to left join. The data is returned as expected, with 731 rows. Although I know this conclusion, I am not sure what happened in the middle. Can anyone give me some help?

Trino version: 442
Fragment 0 [HASH]
Output layout: [ym, date_sap, count]
Output partitioning: SINGLE []
Output[columnNames = [ym, date_sap, _col2]]
│ Layout: [ym:varchar, date_sap:varchar, count:bigint]
│ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
│ _col2 := count
└─ Aggregate[keys = [ym, date_sap]]
│ Layout: [ym:varchar, date_sap:varchar, count:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
│ count := count(*)
└─ LeftJoin[criteria = (ym = ym_2) AND (date_sap = budat_8), distribution = PARTITIONED]
│ Layout: [date_sap:varchar, ym:varchar]
│ Estimates: {rows: ? (?), cpu: ?, memory: 165.20kB, network: 0B}
│ Distribution: PARTITIONED
├─ Filter[filterPredicate = ((date_diff('month', date_parse(substring(date_sap, bigint '1', bigint '6'), '%Y%m'), date_parse(ym, '%Y%m')) < bigint '24') AND (date_diff('month', date_parse(substring(date_sap, bigint '1', bigint '6'), '%Y%m'), date_parse(ym, '%Y%m')) >= bigint '0'))]
│ │ Layout: [date_sap:varchar, ym:varchar]
│ │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
│ └─ CrossJoin[]
│ │ Layout: [date_sap:varchar, ym:varchar]
│ │ Estimates: {rows: ? (?), cpu: ?, memory: 10B, network: 0B}
│ │ Distribution: REPLICATED
│ ├─ Aggregate[type = FINAL, keys = [date_sap]]
│ │ │ Layout: [date_sap:varchar]
│ │ │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
│ │ └─ LocalExchange[partitioning = HASH, arguments = [date_sap]]
│ │ │ Layout: [date_sap:varchar]
│ │ │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
│ │ └─ RemoteSource[sourceFragmentIds = [1]]
│ │ Layout: [date_sap:varchar]
│ └─ LocalExchange[partitioning = SINGLE]
│ │ Layout: [ym:varchar]
│ │ Estimates: {rows: 1 (5B), cpu: 0, memory: 0B, network: 0B}
│ └─ RemoteSource[sourceFragmentIds = [2]]
│ Layout: [ym:varchar]
└─ LocalExchange[partitioning = HASH, arguments = [ym_2, budat_8]]
│ Layout: [ym_2:varchar, budat_8:varchar]
│ Estimates: {rows: 12509 (165.20kB), cpu: 165.20k, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [4]]
Layout: [ym_2:varchar, budat_8:varchar]

Fragment 1 [SOURCE]
Output layout: [date_sap]
Output partitioning: HASH [date_sap]
Aggregate[type = PARTIAL, keys = [date_sap]]
│ Layout: [date_sap:varchar]
└─ ScanFilter[table = ice:dim.dim_date$data@6706129778701784257, filterPredicate = (date_sap BETWEEN varchar '20180101' AND varchar '20240911')]
Layout: [date_sap:varchar]
Estimates: {rows: 12057 (141.43kB), cpu: 141.43k, memory: 0B, network: 0B}/{rows: ? (?), cpu: 141.43k, memory: 0B, network: 0B}
date_sap := 1:date_sap:varchar

Fragment 2 [HASH]
Output layout: [ym]
Output partitioning: BROADCAST []
Aggregate[type = FINAL, keys = [ym]]
│ Layout: [ym:varchar]
│ Estimates: {rows: 1 (5B), cpu: 70.11k, memory: 5B, network: 0B}
└─ LocalExchange[partitioning = HASH, arguments = [ym]]
│ Layout: [ym:varchar]
│ Estimates: {rows: 13899 (70.11kB), cpu: 70.11k, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [3]]
Layout: [ym:varchar]

Fragment 3 [SOURCE]
Output layout: [ym]
Output partitioning: HASH [ym]
Aggregate[type = PARTIAL, keys = [ym]]
│ Layout: [ym:varchar]
│ Estimates: {rows: 13899 (70.11kB), cpu: ?, memory: ?, network: ?}
└─ ScanFilter[table = ice:dwd.dwd_zb_tzzxfx_cpjg$data@4280907653966917687, filterPredicate = (ym = varchar '202408')]
Layout: [ym:varchar]
Estimates: {rows: 1403801 (6.92MB), cpu: 6.92M, memory: 0B, network: 0B}/{rows: 13899 (70.11kB), cpu: 6.92M, memory: 0B, network: 0B}
ym := 1:ym:varchar

Fragment 4 [HASH]
Output layout: [ym_2, budat_8]
Output partitioning: HASH [budat_8]
Project[]
│ Layout: [ym_2:varchar, budat_8:varchar]
│ Estimates: {rows: 12509 (165.20kB), cpu: 165.20k, memory: 0B, network: 0B}
└─ Aggregate[type = FINAL, keys = [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]]
│ Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar]
│ Estimates: {rows: 12509 (387.74kB), cpu: 387.74k, memory: 387.74kB, network: 0B}
└─ LocalExchange[partitioning = HASH, arguments = [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]]
│ Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar]
│ Estimates: {rows: 12509 (387.74kB), cpu: 387.74k, memory: 0B, network: 0B}
└─ RemoteSource[sourceFragmentIds = [5]]
Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar]

Fragment 5 [SOURCE]
Output layout: [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]
Output partitioning: HASH [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]
Aggregate[type = PARTIAL, keys = [ym_2, cydymc_3, kplx_7, budat_8, matnr_13]]
│ Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar]
│ Estimates: {rows: 12509 (387.74kB), cpu: ?, memory: ?, network: ?}
└─ ScanFilter[table = ice:dwd.dwd_zb_tzzxfx_cpjg$data@4280907653966917687, filterPredicate = ((budat_8 BETWEEN varchar '20180101' AND varchar '20240911') AND (ym_2 = varchar '202408') AND (date_diff('month', date_parse(substring(budat_8, bigint '1', bigint '6'), '%Y%m'), timestamp(3) '2024-08-01 00:00:00.000') < bigint '24') AND (date_diff('month', date_parse(substring(budat_8, bigint '1', bigint '6'), '%Y%m'), timestamp(3) '2024-08-01 00:00:00.000') >= bigint '0'))]
Layout: [ym_2:varchar, cydymc_3:varchar, kplx_7:varchar, budat_8:varchar, matnr_13:varchar]
Estimates: {rows: 1403801 (42.49MB), cpu: 42.49M, memory: 0B, network: 0B}/{rows: 12509 (387.74kB), cpu: 42.49M, memory: 0B, network: 0B}
cydymc_3 := 2:cydymc:varchar
budat_8 := 7:budat:varchar
kplx_7 := 6:kplx:varchar
ym_2 := 1:ym:varchar
matnr_13 := 12:matnr:varchar

@Praveen2112
Copy link
Member

Can you provide us a set of reproduction steps with the data so that we could check it out

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants