#pragma once #include namespace DB { namespace ErrorCodes { extern const int UNSUPPORTED_JOIN_KEYS; extern const int LOGICAL_ERROR; } template size_t HashJoinMethods::insertFromBlockImpl( HashJoin & join, HashJoin::Type type, MapsTemplate & maps, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, UInt8ColumnDataPtr join_mask, Arena & pool, bool & is_inserted) { switch (type) { case HashJoin::Type::EMPTY: [[fallthrough]]; case HashJoin::Type::CROSS: /// Do nothing. We will only save block, and it is enough is_inserted = true; return 0; #define M(TYPE) \ case HashJoin::Type::TYPE: \ return insertFromBlockImplTypeCase< \ typename KeyGetterForType>::Type>( \ join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, join_mask, pool, is_inserted); \ break; APPLY_FOR_JOIN_VARIANTS(M) #undef M } } template Block HashJoinMethods::joinBlockImpl( const HashJoin & join, Block & block, const Block & block_with_columns_to_add, const MapsTemplateVector & maps_, bool is_join_get) { constexpr JoinFeatures join_features; std::vector join_on_keys; const auto & onexprs = join.table_join->getClauses(); for (size_t i = 0; i < onexprs.size(); ++i) { const auto & key_names = !is_join_get ? onexprs[i].key_names_left : onexprs[i].key_names_right; join_on_keys.emplace_back(block, key_names, onexprs[i].condColumnNames().first, join.key_sizes[i]); } /** If you use FULL or RIGHT JOIN, then the columns from the "left" table must be materialized. * Because if they are constants, then in the "not joined" rows, they may have different values * - default values, which can differ from the values of these constants. */ if constexpr (join_features.right || join_features.full) { materializeBlockInplace(block); } /** For LEFT/INNER JOIN, the saved blocks do not contain keys. * For FULL/RIGHT JOIN, the saved blocks contain keys; * but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped. * For ASOF, the last column is used as the ASOF column */ AddedColumns added_columns( block, block_with_columns_to_add, join.savedBlockSample(), join, std::move(join_on_keys), join.table_join->getMixedJoinExpression(), join_features.is_asof_join, is_join_get); bool has_required_right_keys = (join.required_right_keys.columns() != 0); added_columns.need_filter = join_features.need_filter || has_required_right_keys; added_columns.max_joined_block_rows = join.max_joined_block_rows; if (!added_columns.max_joined_block_rows) added_columns.max_joined_block_rows = std::numeric_limits::max(); else added_columns.reserve(join_features.need_replication); size_t num_joined = switchJoinRightColumns(maps_, added_columns, join.data->type, *join.used_flags); /// Do not hold memory for join_on_keys anymore added_columns.join_on_keys.clear(); Block remaining_block = sliceBlock(block, num_joined); if (is_join_get) added_columns.buildJoinGetOutput(); else added_columns.buildOutput(); const auto & table_join = join.table_join; std::set block_columns_to_erase; if (join.canRemoveColumnsFromLeftBlock()) { std::unordered_set left_output_columns; for (const auto & out_column : table_join->getOutputColumns(JoinTableSide::Left)) left_output_columns.insert(out_column.name); for (size_t i = 0; i < block.columns(); ++i) { if (!left_output_columns.contains(block.getByPosition(i).name)) block_columns_to_erase.insert(i); } } size_t existing_columns = block.columns(); for (size_t i = 0; i < added_columns.size(); ++i) block.insert(added_columns.moveColumn(i)); std::vector right_keys_to_replicate [[maybe_unused]]; if constexpr (join_features.need_filter) { /// If ANY INNER | RIGHT JOIN - filter all the columns except the new ones. for (size_t i = 0; i < existing_columns; ++i) block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->filter(added_columns.filter, -1); /// Add join key columns from right block if needed using value from left table because of equality for (size_t i = 0; i < join.required_right_keys.columns(); ++i) { const auto & right_key = join.required_right_keys.getByPosition(i); /// asof column is already in block. if (join_features.is_asof_join && right_key.name == join.table_join->getOnlyClause().key_names_right.back()) continue; const auto & left_column = block.getByName(join.required_right_keys_sources[i]); const auto & right_col_name = join.getTableJoin().renamedRightColumnName(right_key.name); auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column); block.insert(std::move(right_col)); } } else if (has_required_right_keys) { /// Add join key columns from right block if needed. for (size_t i = 0; i < join.required_right_keys.columns(); ++i) { const auto & right_key = join.required_right_keys.getByPosition(i); auto right_col_name = join.getTableJoin().renamedRightColumnName(right_key.name); /// asof column is already in block. if (join_features.is_asof_join && right_key.name == join.table_join->getOnlyClause().key_names_right.back()) continue; const auto & left_column = block.getByName(join.required_right_keys_sources[i]); auto right_col = copyLeftKeyColumnToRight(right_key.type, right_col_name, left_column, &added_columns.filter); block.insert(std::move(right_col)); if constexpr (join_features.need_replication) right_keys_to_replicate.push_back(block.getPositionByName(right_col_name)); } } if constexpr (join_features.need_replication) { std::unique_ptr & offsets_to_replicate = added_columns.offsets_to_replicate; /// If ALL ... JOIN - we replicate all the columns except the new ones. for (size_t i = 0; i < existing_columns; ++i) { block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicate(*offsets_to_replicate); } /// Replicate additional right keys for (size_t pos : right_keys_to_replicate) { block.safeGetByPosition(pos).column = block.safeGetByPosition(pos).column->replicate(*offsets_to_replicate); } } block.erase(block_columns_to_erase); return remaining_block; } template template KeyGetter HashJoinMethods::createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes & key_sizes) { if constexpr (is_asof_join) { auto key_column_copy = key_columns; auto key_size_copy = key_sizes; key_column_copy.pop_back(); key_size_copy.pop_back(); return KeyGetter(key_column_copy, key_size_copy, nullptr); } else return KeyGetter(key_columns, key_sizes, nullptr); } template template size_t HashJoinMethods::insertFromBlockImplTypeCase( HashJoin & join, HashMap & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, UInt8ColumnDataPtr join_mask, Arena & pool, bool & is_inserted) { [[maybe_unused]] constexpr bool mapped_one = std::is_same_v; constexpr bool is_asof_join = STRICTNESS == JoinStrictness::Asof; const IColumn * asof_column [[maybe_unused]] = nullptr; if constexpr (is_asof_join) asof_column = key_columns.back(); auto key_getter = createKeyGetter(key_columns, key_sizes); /// For ALL and ASOF join always insert values is_inserted = !mapped_one || is_asof_join; for (size_t i = 0; i < rows; ++i) { if (null_map && (*null_map)[i]) { /// nulls are not inserted into hash table, /// keep them for RIGHT and FULL joins is_inserted = true; continue; } /// Check condition for right table from ON section if (join_mask && !(*join_mask)[i]) continue; if constexpr (is_asof_join) Inserter::insertAsof(join, map, key_getter, stored_block, i, pool, *asof_column); else if constexpr (mapped_one) is_inserted |= Inserter::insertOne(join, map, key_getter, stored_block, i, pool); else Inserter::insertAll(join, map, key_getter, stored_block, i, pool); } return map.getBufferSizeInCells(); } template template size_t HashJoinMethods::switchJoinRightColumns( const std::vector & mapv, AddedColumns & added_columns, HashJoin::Type type, JoinStuff::JoinUsedFlags & used_flags) { constexpr bool is_asof_join = STRICTNESS == JoinStrictness::Asof; switch (type) { case HashJoin::Type::EMPTY: { if constexpr (!is_asof_join) { using KeyGetter = KeyGetterEmpty; std::vector key_getter_vector; key_getter_vector.emplace_back(); using MapTypeVal = typename KeyGetter::MappedType; std::vector a_map_type_vector; a_map_type_vector.emplace_back(); return joinRightColumnsSwitchNullability( std::move(key_getter_vector), a_map_type_vector, added_columns, used_flags); } throw Exception(ErrorCodes::UNSUPPORTED_JOIN_KEYS, "Unsupported JOIN keys. Type: {}", type); } #define M(TYPE) \ case HashJoin::Type::TYPE: { \ using MapTypeVal = const typename std::remove_reference_t::element_type; \ using KeyGetter = typename KeyGetterForType::Type; \ std::vector a_map_type_vector(mapv.size()); \ std::vector key_getter_vector; \ for (size_t d = 0; d < added_columns.join_on_keys.size(); ++d) \ { \ const auto & join_on_key = added_columns.join_on_keys[d]; \ a_map_type_vector[d] = mapv[d]->TYPE.get(); \ key_getter_vector.push_back( \ std::move(createKeyGetter(join_on_key.key_columns, join_on_key.key_sizes))); \ } \ return joinRightColumnsSwitchNullability(std::move(key_getter_vector), a_map_type_vector, added_columns, used_flags); \ } APPLY_FOR_JOIN_VARIANTS(M) #undef M default: throw Exception(ErrorCodes::UNSUPPORTED_JOIN_KEYS, "Unsupported JOIN keys (type: {})", type); } } template template size_t HashJoinMethods::joinRightColumnsSwitchNullability( std::vector && key_getter_vector, const std::vector & mapv, AddedColumns & added_columns, JoinStuff::JoinUsedFlags & used_flags) { if (added_columns.need_filter) { return joinRightColumnsSwitchMultipleDisjuncts( std::forward>(key_getter_vector), mapv, added_columns, used_flags); } return joinRightColumnsSwitchMultipleDisjuncts( std::forward>(key_getter_vector), mapv, added_columns, used_flags); } template template size_t HashJoinMethods::joinRightColumnsSwitchMultipleDisjuncts( std::vector && key_getter_vector, const std::vector & mapv, AddedColumns & added_columns, JoinStuff::JoinUsedFlags & used_flags) { constexpr JoinFeatures join_features; if constexpr (join_features.is_maps_all) { if (added_columns.additional_filter_expression) { bool mark_per_row_used = join_features.right || join_features.full || mapv.size() > 1; return joinRightColumnsWithAddtitionalFilter( std::forward>(key_getter_vector), mapv, added_columns, used_flags, need_filter, mark_per_row_used); } } if (added_columns.additional_filter_expression) throw Exception(ErrorCodes::LOGICAL_ERROR, "Additional filter expression is not supported for this JOIN"); return mapv.size() > 1 ? joinRightColumns( std::forward>(key_getter_vector), mapv, added_columns, used_flags) : joinRightColumns( std::forward>(key_getter_vector), mapv, added_columns, used_flags); } /// Joins right table columns which indexes are present in right_indexes using specified map. /// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS). template template size_t HashJoinMethods::joinRightColumns( std::vector && key_getter_vector, const std::vector & mapv, AddedColumns & added_columns, JoinStuff::JoinUsedFlags & used_flags) { constexpr JoinFeatures join_features; size_t rows = added_columns.rows_to_add; if constexpr (need_filter) added_columns.filter = IColumn::Filter(rows, 0); if constexpr (!flag_per_row && (STRICTNESS == JoinStrictness::All || (STRICTNESS == JoinStrictness::Semi && KIND == JoinKind::Right))) added_columns.output_by_row_list = true; Arena pool; if constexpr (join_features.need_replication) added_columns.offsets_to_replicate = std::make_unique(rows); IColumn::Offset current_offset = 0; size_t max_joined_block_rows = added_columns.max_joined_block_rows; size_t i = 0; for (; i < rows; ++i) { if constexpr (join_features.need_replication) { if (unlikely(current_offset >= max_joined_block_rows)) { added_columns.offsets_to_replicate->resize(i); added_columns.filter.resize(i); break; } } bool right_row_found = false; KnownRowsHolder known_rows; for (size_t onexpr_idx = 0; onexpr_idx < added_columns.join_on_keys.size(); ++onexpr_idx) { const auto & join_keys = added_columns.join_on_keys[onexpr_idx]; if (join_keys.null_map && (*join_keys.null_map)[i]) continue; bool row_acceptable = !join_keys.isRowFiltered(i); using FindResult = typename KeyGetter::FindResult; auto find_result = row_acceptable ? key_getter_vector[onexpr_idx].findKey(*(mapv[onexpr_idx]), i, pool) : FindResult(); if (find_result.isFound()) { right_row_found = true; auto & mapped = find_result.getMapped(); if constexpr (join_features.is_asof_join) { const IColumn & left_asof_key = added_columns.leftAsofKey(); auto row_ref = mapped->findAsof(left_asof_key, i); if (row_ref && row_ref->block) { setUsed(added_columns.filter, i); if constexpr (flag_per_row) used_flags.template setUsed(row_ref->block, row_ref->row_num, 0); else used_flags.template setUsed(find_result); added_columns.appendFromBlock(row_ref, join_features.add_missing); } else addNotFoundRow(added_columns, current_offset); } else if constexpr (join_features.is_all_join) { setUsed(added_columns.filter, i); used_flags.template setUsed(find_result); auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr; addFoundRowAll(mapped, added_columns, current_offset, known_rows, used_flags_opt); } else if constexpr ((join_features.is_any_join || join_features.is_semi_join) && join_features.right) { /// Use first appeared left key + it needs left columns replication bool used_once = used_flags.template setUsedOnce(find_result); if (used_once) { auto used_flags_opt = join_features.need_flags ? &used_flags : nullptr; setUsed(added_columns.filter, i); addFoundRowAll(mapped, added_columns, current_offset, known_rows, used_flags_opt); } } else if constexpr (join_features.is_any_join && join_features.inner) { bool used_once = used_flags.template setUsedOnce(find_result); /// Use first appeared left key only if (used_once) { setUsed(added_columns.filter, i); added_columns.appendFromBlock(&mapped, join_features.add_missing); } break; } else if constexpr (join_features.is_any_join && join_features.full) { /// TODO } else if constexpr (join_features.is_anti_join) { if constexpr (join_features.right && join_features.need_flags) used_flags.template setUsed(find_result); } else /// ANY LEFT, SEMI LEFT, old ANY (RightAny) { setUsed(added_columns.filter, i); used_flags.template setUsed(find_result); added_columns.appendFromBlock(&mapped, join_features.add_missing); if (join_features.is_any_or_semi_join) { break; } } } } if (!right_row_found) { if constexpr (join_features.is_anti_join && join_features.left) setUsed(added_columns.filter, i); addNotFoundRow(added_columns, current_offset); } if constexpr (join_features.need_replication) { (*added_columns.offsets_to_replicate)[i] = current_offset; } } added_columns.applyLazyDefaults(); return i; } template template void HashJoinMethods::setUsed(IColumn::Filter & filter [[maybe_unused]], size_t pos [[maybe_unused]]) { if constexpr (need_filter) filter[pos] = 1; } template template ColumnPtr HashJoinMethods::buildAdditionalFilter( size_t left_start_row, const std::vector & selected_rows, const std::vector & row_replicate_offset, AddedColumns & added_columns) { ColumnPtr result_column; do { if (selected_rows.empty()) { result_column = ColumnUInt8::create(); break; } const Block & sample_right_block = *((*selected_rows.begin())->block); if (!sample_right_block || !added_columns.additional_filter_expression) { auto filter = ColumnUInt8::create(); filter->insertMany(1, selected_rows.size()); result_column = std::move(filter); break; } auto required_cols = added_columns.additional_filter_expression->getRequiredColumnsWithTypes(); if (required_cols.empty()) { Block block; added_columns.additional_filter_expression->execute(block); result_column = block.getByPosition(0).column->cloneResized(selected_rows.size()); break; } NameSet required_column_names; for (auto & col : required_cols) required_column_names.insert(col.name); Block executed_block; size_t right_col_pos = 0; for (const auto & col : sample_right_block.getColumnsWithTypeAndName()) { if (required_column_names.contains(col.name)) { auto new_col = col.column->cloneEmpty(); for (const auto & selected_row : selected_rows) { const auto & src_col = selected_row->block->getByPosition(right_col_pos); new_col->insertFrom(*src_col.column, selected_row->row_num); } executed_block.insert({std::move(new_col), col.type, col.name}); } right_col_pos += 1; } if (!executed_block) { result_column = ColumnUInt8::create(); break; } for (const auto & col_name : required_column_names) { const auto * src_col = added_columns.left_block.findByName(col_name); if (!src_col) continue; auto new_col = src_col->column->cloneEmpty(); size_t prev_left_offset = 0; for (size_t i = 1; i < row_replicate_offset.size(); ++i) { const size_t & left_offset = row_replicate_offset[i]; size_t rows = left_offset - prev_left_offset; if (rows) new_col->insertManyFrom(*src_col->column, left_start_row + i - 1, rows); prev_left_offset = left_offset; } executed_block.insert({std::move(new_col), src_col->type, col_name}); } if (!executed_block) { throw Exception( ErrorCodes::LOGICAL_ERROR, "required columns: [{}], but not found any in left/right table. right table: {}, left table: {}", required_cols.toString(), sample_right_block.dumpNames(), added_columns.left_block.dumpNames()); } for (const auto & col : executed_block.getColumnsWithTypeAndName()) if (!col.column || !col.type) throw Exception(ErrorCodes::LOGICAL_ERROR, "Illegal nullptr column in input block: {}", executed_block.dumpStructure()); added_columns.additional_filter_expression->execute(executed_block); result_column = executed_block.getByPosition(0).column->convertToFullColumnIfConst(); executed_block.clear(); } while (false); result_column = result_column->convertToFullIfNeeded(); if (result_column->isNullable()) { /// Convert Nullable(UInt8) to UInt8 ensuring that nulls are zeros /// Trying to avoid copying data, since we are the only owner of the column. ColumnPtr mask_column = assert_cast(*result_column).getNullMapColumnPtr(); MutableColumnPtr mutable_column; { ColumnPtr nested_column = assert_cast(*result_column).getNestedColumnPtr(); result_column.reset(); mutable_column = IColumn::mutate(std::move(nested_column)); } auto & column_data = assert_cast(*mutable_column).getData(); const auto & mask_column_data = assert_cast(*mask_column).getData(); for (size_t i = 0; i < column_data.size(); ++i) { if (mask_column_data[i]) column_data[i] = 0; } return mutable_column; } return result_column; } template template size_t HashJoinMethods::joinRightColumnsWithAddtitionalFilter( std::vector && key_getter_vector, const std::vector & mapv, AddedColumns & added_columns, JoinStuff::JoinUsedFlags & used_flags [[maybe_unused]], bool need_filter [[maybe_unused]], bool flag_per_row [[maybe_unused]]) { constexpr JoinFeatures join_features; size_t left_block_rows = added_columns.rows_to_add; if (need_filter) added_columns.filter = IColumn::Filter(left_block_rows, 0); std::unique_ptr pool; if constexpr (join_features.need_replication) added_columns.offsets_to_replicate = std::make_unique(left_block_rows); std::vector row_replicate_offset; row_replicate_offset.reserve(left_block_rows); using FindResult = typename KeyGetter::FindResult; size_t max_joined_block_rows = added_columns.max_joined_block_rows; size_t left_row_iter = 0; PreSelectedRows selected_rows; selected_rows.reserve(left_block_rows); std::vector find_results; find_results.reserve(left_block_rows); bool exceeded_max_block_rows = false; IColumn::Offset total_added_rows = 0; IColumn::Offset current_added_rows = 0; auto collect_keys_matched_rows_refs = [&]() { pool = std::make_unique(); find_results.clear(); row_replicate_offset.clear(); row_replicate_offset.push_back(0); current_added_rows = 0; selected_rows.clear(); for (; left_row_iter < left_block_rows; ++left_row_iter) { if constexpr (join_features.need_replication) { if (unlikely(total_added_rows + current_added_rows >= max_joined_block_rows)) { break; } } KnownRowsHolder all_flag_known_rows; KnownRowsHolder single_flag_know_rows; for (size_t join_clause_idx = 0; join_clause_idx < added_columns.join_on_keys.size(); ++join_clause_idx) { const auto & join_keys = added_columns.join_on_keys[join_clause_idx]; if (join_keys.null_map && (*join_keys.null_map)[left_row_iter]) continue; bool row_acceptable = !join_keys.isRowFiltered(left_row_iter); auto find_result = row_acceptable ? key_getter_vector[join_clause_idx].findKey(*(mapv[join_clause_idx]), left_row_iter, *pool) : FindResult(); if (find_result.isFound()) { auto & mapped = find_result.getMapped(); find_results.push_back(find_result); /// We don't add missing in addFoundRowAll here. we will add it after filter is applied. /// it's different from `joinRightColumns`. if (flag_per_row) addFoundRowAll(mapped, selected_rows, current_added_rows, all_flag_known_rows, nullptr); else addFoundRowAll(mapped, selected_rows, current_added_rows, single_flag_know_rows, nullptr); } } row_replicate_offset.push_back(current_added_rows); } }; auto copy_final_matched_rows = [&](size_t left_start_row, ColumnPtr filter_col) { const PaddedPODArray & filter_flags = assert_cast(*filter_col).getData(); size_t prev_replicated_row = 0; auto selected_right_row_it = selected_rows.begin(); size_t find_result_index = 0; for (size_t i = 1, n = row_replicate_offset.size(); i < n; ++i) { bool any_matched = false; /// right/full join or multiple disjuncts, we need to mark used flags for each row. if (flag_per_row) { for (size_t replicated_row = prev_replicated_row; replicated_row < row_replicate_offset[i]; ++replicated_row) { if (filter_flags[replicated_row]) { if constexpr (join_features.is_semi_join || join_features.is_any_join) { /// For LEFT/INNER SEMI/ANY JOIN, we need to add only first appeared row from left, if constexpr (join_features.left || join_features.inner) { if (!any_matched) { // For inner join, we need mark each right row'flag, because we only use each right row once. auto used_once = used_flags.template setUsedOnce( (*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0); if (used_once) { any_matched = true; total_added_rows += 1; added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing); } } } else { auto used_once = used_flags.template setUsedOnce( (*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0); if (used_once) { any_matched = true; total_added_rows += 1; added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing); } } } else if constexpr (join_features.is_anti_join) { any_matched = true; if constexpr (join_features.right && join_features.need_flags) used_flags.template setUsed((*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0); } else { any_matched = true; total_added_rows += 1; added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing); used_flags.template setUsed((*selected_right_row_it)->block, (*selected_right_row_it)->row_num, 0); } } ++selected_right_row_it; } } else { for (size_t replicated_row = prev_replicated_row; replicated_row < row_replicate_offset[i]; ++replicated_row) { if constexpr (join_features.is_anti_join) { any_matched |= filter_flags[replicated_row]; } else if constexpr (join_features.need_replication) { if (filter_flags[replicated_row]) { any_matched = true; added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing); total_added_rows += 1; } ++selected_right_row_it; } else { if (filter_flags[replicated_row]) { any_matched = true; added_columns.appendFromBlock(*selected_right_row_it, join_features.add_missing); total_added_rows += 1; selected_right_row_it = selected_right_row_it + row_replicate_offset[i] - replicated_row; break; } ++selected_right_row_it; } } } if constexpr (join_features.is_anti_join) { if (!any_matched) { if constexpr (join_features.left) if (need_filter) setUsed(added_columns.filter, left_start_row + i - 1); addNotFoundRow(added_columns, total_added_rows); } } else { if (!any_matched) { addNotFoundRow(added_columns, total_added_rows); } else { if (!flag_per_row) used_flags.template setUsed(find_results[find_result_index]); if (need_filter) setUsed(added_columns.filter, left_start_row + i - 1); if constexpr (join_features.add_missing) added_columns.applyLazyDefaults(); } } find_result_index += (prev_replicated_row != row_replicate_offset[i]); if constexpr (join_features.need_replication) { (*added_columns.offsets_to_replicate)[left_start_row + i - 1] = total_added_rows; } prev_replicated_row = row_replicate_offset[i]; } }; while (left_row_iter < left_block_rows && !exceeded_max_block_rows) { auto left_start_row = left_row_iter; collect_keys_matched_rows_refs(); if (selected_rows.size() != current_added_rows || row_replicate_offset.size() != left_row_iter - left_start_row + 1) { throw Exception( ErrorCodes::LOGICAL_ERROR, "Sizes are mismatched. selected_rows.size:{}, current_added_rows:{}, row_replicate_offset.size:{}, left_row_iter: {}, " "left_start_row: {}", selected_rows.size(), current_added_rows, row_replicate_offset.size(), left_row_iter, left_start_row); } auto filter_col = buildAdditionalFilter(left_start_row, selected_rows, row_replicate_offset, added_columns); copy_final_matched_rows(left_start_row, filter_col); if constexpr (join_features.need_replication) { // Add a check for current_added_rows to avoid run the filter expression on too small size batch. if (total_added_rows >= max_joined_block_rows || current_added_rows < 1024) exceeded_max_block_rows = true; } } if constexpr (join_features.need_replication) { added_columns.offsets_to_replicate->resize_assume_reserved(left_row_iter); added_columns.filter.resize_assume_reserved(left_row_iter); } added_columns.applyLazyDefaults(); return left_row_iter; } template Block HashJoinMethods::sliceBlock(Block & block, size_t num_rows) { size_t total_rows = block.rows(); if (num_rows >= total_rows) return {}; size_t remaining_rows = total_rows - num_rows; Block remaining_block = block.cloneEmpty(); for (size_t i = 0; i < block.columns(); ++i) { auto & col = block.getByPosition(i); remaining_block.getByPosition(i).column = col.column->cut(num_rows, remaining_rows); col.column = col.column->cut(0, num_rows); } return remaining_block; } template ColumnWithTypeAndName HashJoinMethods::copyLeftKeyColumnToRight( const DataTypePtr & right_key_type, const String & renamed_right_column, const ColumnWithTypeAndName & left_column, const IColumn::Filter * null_map_filter) { ColumnWithTypeAndName right_column = left_column; right_column.name = renamed_right_column; if (null_map_filter) right_column.column = JoinCommon::filterWithBlanks(right_column.column, *null_map_filter); bool should_be_nullable = isNullableOrLowCardinalityNullable(right_key_type); if (null_map_filter) correctNullabilityInplace(right_column, should_be_nullable, *null_map_filter); else correctNullabilityInplace(right_column, should_be_nullable); if (!right_column.type->equals(*right_key_type)) { right_column.column = castColumnAccurate(right_column, right_key_type); right_column.type = right_key_type; } right_column.column = right_column.column->convertToFullColumnIfConst(); return right_column; } template void HashJoinMethods::correctNullabilityInplace(ColumnWithTypeAndName & column, bool nullable) { if (nullable) { JoinCommon::convertColumnToNullable(column); } else { /// We have to replace values masked by NULLs with defaults. if (column.column) if (const auto * nullable_column = checkAndGetColumn(&*column.column)) column.column = JoinCommon::filterWithBlanks(column.column, nullable_column->getNullMapColumn().getData(), true); JoinCommon::removeColumnNullability(column); } } template void HashJoinMethods::correctNullabilityInplace( ColumnWithTypeAndName & column, bool nullable, const IColumn::Filter & negative_null_map) { if (nullable) { JoinCommon::convertColumnToNullable(column); if (column.type->isNullable() && !negative_null_map.empty()) { MutableColumnPtr mutable_column = IColumn::mutate(std::move(column.column)); assert_cast(*mutable_column).applyNegatedNullMap(negative_null_map); column.column = std::move(mutable_column); } } else JoinCommon::removeColumnNullability(column); } }