#pragma once #include #include #include #include namespace ProfileEvents { extern const Event MergedUncompressedBytes; extern const Event MergedRows; extern const Event MutatedRows; extern const Event MutatedUncompressedBytes; } namespace DB { /** Progress callback. * What it should update: * - approximate progress * - amount of read rows * - various metrics * - time elapsed for current merge. */ /// Auxiliary struct that for each merge stage stores its current progress. /// A stage is: the horizontal stage + a stage for each gathered column (if we are doing a /// Vertical merge) or a mutation of a single part. During a single stage all rows are read. struct MergeStageProgress { explicit MergeStageProgress(Float64 weight_) : is_first(true) , weight(weight_) {} MergeStageProgress(Float64 initial_progress_, Float64 weight_) : initial_progress(initial_progress_), is_first(false), weight(weight_) {} Float64 initial_progress = 0.0; bool is_first; Float64 weight; UInt64 total_rows = 0; UInt64 rows_read = 0; }; class MergeProgressCallback { public: MergeProgressCallback( MergeListElement * merge_list_element_ptr_, UInt64 & watch_prev_elapsed_, MergeStageProgress & stage_) : merge_list_element_ptr(merge_list_element_ptr_) , watch_prev_elapsed(watch_prev_elapsed_) , stage(stage_) { updateWatch(); } MergeListElement * merge_list_element_ptr; UInt64 & watch_prev_elapsed; MergeStageProgress & stage; void updateWatch() { UInt64 watch_curr_elapsed = merge_list_element_ptr->watch.elapsed(); watch_prev_elapsed = watch_curr_elapsed; } void operator()(const Progress & value) { if (merge_list_element_ptr->is_mutation) updateProfileEvents(value, ProfileEvents::MutatedRows, ProfileEvents::MutatedUncompressedBytes); else updateProfileEvents(value, ProfileEvents::MergedRows, ProfileEvents::MergedUncompressedBytes); updateWatch(); merge_list_element_ptr->bytes_read_uncompressed += value.read_bytes; if (stage.is_first) merge_list_element_ptr->rows_read += value.read_rows; stage.total_rows += value.total_rows_to_read; stage.rows_read += value.read_rows; if (stage.total_rows > 0) { merge_list_element_ptr->progress.store( stage.initial_progress + stage.weight * stage.rows_read / stage.total_rows, std::memory_order_relaxed); } } private: void updateProfileEvents(const Progress & value, ProfileEvents::Event rows_event, ProfileEvents::Event bytes_event) const { ProfileEvents::increment(bytes_event, value.read_bytes); if (stage.is_first) ProfileEvents::increment(rows_event, value.read_rows); } }; }