Skip to content
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 32 additions & 5 deletions src/common/hashtable/src/hashjoin_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::alloc::Allocator;
use std::marker::PhantomData;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;

use databend_common_base::hints::assume;
Expand Down Expand Up @@ -102,18 +103,31 @@ pub fn hash_bits() -> u32 {
}
}

pub struct HashJoinHashTable<K: Keyable, A: Allocator + Clone = DefaultAllocator> {
pub struct HashJoinHashTable<
K: Keyable,
const SKIP_DUPLICATES: bool = false,
A: Allocator + Clone = DefaultAllocator,
> {
pub(crate) pointers: Box<[u64], A>,
pub(crate) atomic_pointers: *mut AtomicU64,
pub(crate) hash_shift: usize,
pub(crate) phantom: PhantomData<K>,
pub(crate) count: AtomicUsize,
}

unsafe impl<K: Keyable + Send, A: Allocator + Clone + Send> Send for HashJoinHashTable<K, A> {}
unsafe impl<K: Keyable + Send, A: Allocator + Clone + Send, const SKIP_DUPLICATES: bool> Send
for HashJoinHashTable<K, SKIP_DUPLICATES, A>
{
}

unsafe impl<K: Keyable + Sync, A: Allocator + Clone + Sync> Sync for HashJoinHashTable<K, A> {}
unsafe impl<K: Keyable + Sync, A: Allocator + Clone + Sync, const SKIP_DUPLICATES: bool> Sync
for HashJoinHashTable<K, SKIP_DUPLICATES, A>
{
}

impl<K: Keyable, A: Allocator + Clone + Default> HashJoinHashTable<K, A> {
impl<K: Keyable, A: Allocator + Clone + Default + 'static, const SKIP_DUPLICATES: bool>
HashJoinHashTable<K, SKIP_DUPLICATES, A>
{
pub fn with_build_row_num(row_num: usize) -> Self {
let capacity = std::cmp::max((row_num * 2).next_power_of_two(), 1 << 10);
let mut hashtable = Self {
Expand All @@ -123,6 +137,7 @@ impl<K: Keyable, A: Allocator + Clone + Default> HashJoinHashTable<K, A> {
atomic_pointers: std::ptr::null_mut(),
hash_shift: (hash_bits() - capacity.trailing_zeros()) as usize,
phantom: PhantomData,
count: Default::default(),
};
hashtable.atomic_pointers = unsafe {
std::mem::transmute::<*mut u64, *mut AtomicU64>(hashtable.pointers.as_mut_ptr())
Expand All @@ -138,6 +153,12 @@ impl<K: Keyable, A: Allocator + Clone + Default> HashJoinHashTable<K, A> {
// `index` is less than the capacity of hash table.
let mut old_header = unsafe { (*self.atomic_pointers.add(index)).load(Ordering::Relaxed) };
loop {
if SKIP_DUPLICATES
&& early_filtering(old_header, hash)
&& self.next_contains(&key, remove_header_tag(old_header))
{
return;
}
let res = unsafe {
(*self.atomic_pointers.add(index)).compare_exchange_weak(
old_header,
Expand All @@ -151,11 +172,13 @@ impl<K: Keyable, A: Allocator + Clone + Default> HashJoinHashTable<K, A> {
Err(x) => old_header = x,
};
}
self.count.fetch_add(1, Ordering::Relaxed);
unsafe { (*entry_ptr).next = remove_header_tag(old_header) };
}
}

impl<K, A> HashJoinHashtableLike for HashJoinHashTable<K, A>
impl<K, A, const SKIP_DUPLICATES: bool> HashJoinHashtableLike
for HashJoinHashTable<K, SKIP_DUPLICATES, A>
where
K: Keyable,
A: Allocator + Clone + 'static,
Expand Down Expand Up @@ -373,4 +396,8 @@ where
}
0
}

fn len(&self) -> usize {
self.count.load(Ordering::Relaxed)
}
}
36 changes: 31 additions & 5 deletions src/common/hashtable/src/hashjoin_string_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::alloc::Allocator;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;

use databend_common_base::hints::assume;
Expand All @@ -38,17 +39,29 @@ pub struct StringRawEntry {
pub next: u64,
}

pub struct HashJoinStringHashTable<A: Allocator + Clone = DefaultAllocator> {
pub struct HashJoinStringHashTable<
const SKIP_DUPLICATES: bool = false,
A: Allocator + Clone = DefaultAllocator,
> {
pub(crate) pointers: Box<[u64], A>,
pub(crate) atomic_pointers: *mut AtomicU64,
pub(crate) hash_shift: usize,
pub(crate) count: AtomicUsize,
}

unsafe impl<A: Allocator + Clone + Send> Send for HashJoinStringHashTable<A> {}
unsafe impl<A: Allocator + Clone + Send, const SKIP_DUPLICATES: bool> Send
for HashJoinStringHashTable<SKIP_DUPLICATES, A>
{
}

unsafe impl<A: Allocator + Clone + Sync> Sync for HashJoinStringHashTable<A> {}
unsafe impl<A: Allocator + Clone + Sync, const SKIP_DUPLICATES: bool> Sync
for HashJoinStringHashTable<SKIP_DUPLICATES, A>
{
}

impl<A: Allocator + Clone + Default> HashJoinStringHashTable<A> {
impl<A: Allocator + Clone + Default + 'static, const SKIP_DUPLICATES: bool>
HashJoinStringHashTable<SKIP_DUPLICATES, A>
{
pub fn with_build_row_num(row_num: usize) -> Self {
let capacity = std::cmp::max((row_num * 2).next_power_of_two(), 1 << 10);
let mut hashtable = Self {
Expand All @@ -57,6 +70,7 @@ impl<A: Allocator + Clone + Default> HashJoinStringHashTable<A> {
},
atomic_pointers: std::ptr::null_mut(),
hash_shift: (hash_bits() - capacity.trailing_zeros()) as usize,
count: Default::default(),
};
hashtable.atomic_pointers = unsafe {
std::mem::transmute::<*mut u64, *mut AtomicU64>(hashtable.pointers.as_mut_ptr())
Expand All @@ -72,6 +86,12 @@ impl<A: Allocator + Clone + Default> HashJoinStringHashTable<A> {
// `index` is less than the capacity of hash table.
let mut old_header = unsafe { (*self.atomic_pointers.add(index)).load(Ordering::Relaxed) };
loop {
if SKIP_DUPLICATES
&& early_filtering(old_header, hash)
&& self.next_contains(key, remove_header_tag(old_header))
{
return;
}
let res = unsafe {
(*self.atomic_pointers.add(index)).compare_exchange_weak(
old_header,
Expand All @@ -85,11 +105,13 @@ impl<A: Allocator + Clone + Default> HashJoinStringHashTable<A> {
Err(x) => old_header = x,
};
}
self.count.fetch_add(1, Ordering::Relaxed);
unsafe { (*entry_ptr).next = remove_header_tag(old_header) };
}
}

impl<A> HashJoinHashtableLike for HashJoinStringHashTable<A>
impl<A, const SKIP_DUPLICATES: bool> HashJoinHashtableLike
for HashJoinStringHashTable<SKIP_DUPLICATES, A>
where A: Allocator + Clone + 'static
{
type Key = [u8];
Expand Down Expand Up @@ -341,4 +363,8 @@ where A: Allocator + Clone + 'static
}
0
}

fn len(&self) -> usize {
self.count.load(Ordering::Relaxed)
}
}
6 changes: 4 additions & 2 deletions src/common/hashtable/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ pub use hashjoin_string_hashtable::StringRawEntry;
pub use hashjoin_string_hashtable::STRING_EARLY_SIZE;
pub use keys_ref::KeysRef;
pub use partitioned_hashtable::hash2bucket;
pub type HashJoinHashMap<K> = hashjoin_hashtable::HashJoinHashTable<K>;
pub type BinaryHashJoinHashMap = hashjoin_string_hashtable::HashJoinStringHashTable;
pub type HashJoinHashMap<K, const SKIP_DUPLICATES: bool = false> =
hashjoin_hashtable::HashJoinHashTable<K, SKIP_DUPLICATES>;
pub type BinaryHashJoinHashMap<const SKIP_DUPLICATES: bool = false> =
hashjoin_string_hashtable::HashJoinStringHashTable<SKIP_DUPLICATES>;
pub use traits::HashJoinHashtableLike;
pub use utils::fast_memcmp;
pub use utils::Interval;
Expand Down
6 changes: 6 additions & 0 deletions src/common/hashtable/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,4 +569,10 @@ pub trait HashJoinHashtableLike {

// Find the next matched ptr.
fn next_matched_ptr(&self, key: &Self::Key, ptr: u64) -> u64;

fn len(&self) -> usize;

fn is_empty(&self) -> bool {
self.len() == 0
}
}
13 changes: 13 additions & 0 deletions src/query/ast/src/ast/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,15 @@ impl Display for TableReference {
JoinOperator::RightAsof => {
write!(f, " ASOF RIGHT JOIN")?;
}
JoinOperator::InnerAny => {
write!(f, " INNER ANY JOIN")?;
}
JoinOperator::LeftAny => {
write!(f, " LEFT ANY JOIN")?;
}
JoinOperator::RightAny => {
write!(f, " RIGHT ANY JOIN")?;
}
}
write!(f, " {}", join.right)?;
match &join.condition {
Expand Down Expand Up @@ -1134,6 +1143,10 @@ pub enum JoinOperator {
Asof,
LeftAsof,
RightAsof,
// Any
InnerAny,
LeftAny,
RightAny,
}

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
Expand Down
3 changes: 3 additions & 0 deletions src/query/ast/src/parser/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,11 +671,14 @@ pub fn table_alias_without_as(i: Input) -> IResult<TableAlias> {

pub fn join_operator(i: Input) -> IResult<JoinOperator> {
alt((
value(JoinOperator::InnerAny, rule! { INNER ~ ANY }),
value(JoinOperator::Inner, rule! { INNER }),
value(JoinOperator::LeftSemi, rule! { LEFT? ~ SEMI }),
value(JoinOperator::RightSemi, rule! { RIGHT ~ SEMI }),
value(JoinOperator::LeftAnti, rule! { LEFT? ~ ANTI }),
value(JoinOperator::RightAnti, rule! { RIGHT ~ ANTI }),
value(JoinOperator::LeftAny, rule! { LEFT ~ ANY }),
value(JoinOperator::RightAny, rule! { RIGHT ~ ANY }),
value(JoinOperator::LeftOuter, rule! { LEFT ~ OUTER? }),
value(JoinOperator::RightOuter, rule! { RIGHT ~ OUTER? }),
value(JoinOperator::FullOuter, rule! { FULL ~ OUTER? }),
Expand Down
5 changes: 4 additions & 1 deletion src/query/expression/src/kernels/group_by_hash/method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ macro_rules! with_join_hash_method {
( | $t:tt | $($tail:tt)* ) => {
match_template::match_template! {
$t = [Serializer, SingleBinary, KeysU8, KeysU16,
KeysU32, KeysU64, KeysU128, KeysU256],
KeysU32, KeysU64, KeysU128, KeysU256, SkipDuplicatesSerializer,
SkipDuplicatesSingleBinary, SkipDuplicatesKeysU8, SkipDuplicatesKeysU16,
SkipDuplicatesKeysU32, SkipDuplicatesKeysU64, SkipDuplicatesKeysU128,
SkipDuplicatesKeysU256],
$($tail)*
}
}
Expand Down
11 changes: 9 additions & 2 deletions src/query/service/src/physical_plans/physical_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,11 @@ impl PhysicalPlanBuilder {
build_side: &PhysicalPlan,
) -> Result<DataSchemaRef> {
match join_type {
JoinType::Left | JoinType::LeftSingle | JoinType::LeftAsof | JoinType::Full => {
JoinType::Left
| JoinType::LeftAny
| JoinType::LeftSingle
| JoinType::LeftAsof
| JoinType::Full => {
let build_schema = build_side.output_schema()?;
// Wrap nullable type for columns in build side
let build_schema = DataSchemaRefExt::create(
Expand Down Expand Up @@ -805,7 +809,7 @@ impl PhysicalPlanBuilder {
let left_expr_for_runtime_filter = self.prepare_runtime_filter_expr(left_condition)?;

// Handle inner join column optimization
if join.join_type == JoinType::Inner {
if matches!(join.join_type, JoinType::Inner | JoinType::InnerAny) {
self.handle_inner_join_column_optimization(
left_condition,
right_condition,
Expand Down Expand Up @@ -1034,9 +1038,12 @@ impl PhysicalPlanBuilder {
let merged_fields = match join.join_type {
JoinType::Cross
| JoinType::Inner
| JoinType::InnerAny
| JoinType::Left
| JoinType::LeftAny
| JoinType::LeftSingle
| JoinType::Right
| JoinType::RightAny
| JoinType::RightSingle
| JoinType::Full => {
let mut result = probe_fields.clone();
Expand Down
6 changes: 6 additions & 0 deletions src/query/service/src/physical_plans/physical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_sql::binder::JoinPredicate;
use databend_common_sql::optimizer::ir::RelExpr;
Expand Down Expand Up @@ -40,6 +41,11 @@ pub fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {
JoinType::Asof | JoinType::LeftAsof | JoinType::RightAsof
);

if join.equi_conditions.is_empty() && join.join_type.is_any_join() {
return Err(ErrorCode::SemanticError(
"ANY JOIN only supports equality-based hash joins",
));
}
if !join.equi_conditions.is_empty() && !check_asof {
// Contain equi condition, use hash join
return Ok(PhysicalJoinType::Hash);
Expand Down
Loading
Loading