#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace DB
{
/** The engine that uses the merge tree (see MergeTreeData) and is replicated through ZooKeeper.
*
* ZooKeeper is used for the following things:
* - the structure of the table (/metadata, /columns)
* - action log with data (/log/log-...,/replicas/replica_name/queue/queue-...);
* - a replica list (/replicas), and replica activity tag (/replicas/replica_name/is_active), replica addresses (/replicas/replica_name/host);
* - the leader replica election (/leader_election) - these are the replicas that assign merges, mutations
* and partition manipulations.
* (after ClickHouse version 20.5 we allow multiple leaders to act concurrently);
* - a set of parts of data on each replica (/replicas/replica_name/parts);
* - list of the last N blocks of data with checksum, for deduplication (/blocks);
* - the list of incremental block numbers (/block_numbers) that we are about to insert,
* to ensure the linear order of data insertion and data merge only on the intervals in this sequence;
* - coordinate writes with quorum (/quorum).
* - Storage of mutation entries (ALTER DELETE, ALTER UPDATE etc.) to execute (/mutations).
* See comments in StorageReplicatedMergeTree::mutate() for details.
*/
/** The replicated tables have a common log (/log/log-...).
* Log - a sequence of entries (LogEntry) about what to do.
* Each entry is one of:
* - normal data insertion (GET),
* - data insertion with a possible attach from local data (ATTACH),
* - merge (MERGE),
* - delete the partition (DROP).
*
* Each replica copies (queueUpdatingTask, pullLogsToQueue) entries from the log to its queue (/replicas/replica_name/queue/queue-...)
* and then executes them (queueTask).
* Despite the name of the "queue", execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry).
* In addition, the records in the queue can be generated independently (not from the log), in the following cases:
* - when creating a new replica, actions are put on GET from other replicas (createReplica);
* - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check
* (at start - checkParts, while running - searchForMissingPart), actions are put on GET from other replicas;
*
* The replica to which INSERT was made in the queue will also have an entry of the GET of this data.
* Such an entry is considered to be executed as soon as the queue handler sees it.
*
* The log entry has a creation time. This time is generated by the clock of server that created entry
* - the one on which the corresponding INSERT or ALTER query came.
*
* For the entries in the queue that the replica made for itself,
* as the time will take the time of creation the appropriate part on any of the replicas.
*/
class ZooKeeperWithFaultInjection;
using ZooKeeperWithFaultInjectionPtr = std::shared_ptr;
class StorageReplicatedMergeTree final : public MergeTreeData
{
public:
/** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table.
*/
StorageReplicatedMergeTree(
const TableZnodeInfo & zookeeper_info_,
LoadingStrictnessLevel mode,
const StorageID & table_id_,
const String & relative_data_path_,
const StorageInMemoryMetadata & metadata_,
ContextMutablePtr context_,
const String & date_column_name,
const MergingParams & merging_params_,
std::unique_ptr settings_,
bool need_check_structure);
void startup() override;
/// To many shutdown methods....
///
/// Partial shutdown called if we loose connection to zookeeper.
/// Table can also recover after partial shutdown and continue
/// to work. This method can be called regularly.
void partialShutdown();
/// These two methods are called during final table shutdown (DROP/DETACH/overall server shutdown).
/// The shutdown process is split into two methods to make it more soft and fast. In database shutdown()
/// looks like:
/// for (table : tables)
/// table->flushAndPrepareForShutdown()
///
/// for (table : tables)
/// table->shutdown()
///
/// So we stop producing all the parts first for all tables (fast operation). And after we can wait in shutdown()
/// for other replicas to download parts.
///
/// In flushAndPrepareForShutdown we cancel all part-producing operations:
/// merges, fetches, moves and so on. If it wasn't called before shutdown() -- shutdown() will
/// call it (defensive programming).
void flushAndPrepareForShutdown() override;
/// In shutdown we completely terminate table -- remove
/// is_active node and interserver handler. Also optionally
/// wait until other replicas will download some parts from our replica.
void shutdown(bool is_drop) override;
~StorageReplicatedMergeTree() override;
std::string getName() const override { return "Replicated" + merging_params.getModeName() + "MergeTree"; }
bool supportsParallelInsert() const override { return true; }
bool supportsReplication() const override { return true; }
bool supportsDeduplication() const override { return true; }
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
std::optional totalRows(const Settings & settings) const override;
std::optional totalRowsByPartitionPredicate(const ActionsDAG & filter_actions_dag, ContextPtr context) const override;
std::optional totalBytes(const Settings & settings) const override;
std::optional totalBytesUncompressed(const Settings & settings) const override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool async_insert) override;
std::optional distributedWrite(const ASTInsertQuery & /*query*/, ContextPtr /*context*/) override;
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr query_context) override;
void alter(const AlterCommands & commands, ContextPtr query_context, AlterLockHolder & table_lock_holder) override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
void waitMutation(const String & znode_name, size_t mutations_sync) const;
std::vector getMutationsStatus() const override;
CancellationCode killMutation(const String & mutation_id) override;
bool hasLightweightDeletedMask() const override;
/** Removes a replica from ZooKeeper. If there are no other replicas, it deletes the entire table from ZooKeeper.
*/
void drop() override;
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr query_context, TableExclusiveLockHolder &) override;
void checkTableCanBeRenamed(const StorageID & new_name) const override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
void checkTableCanBeDropped([[ maybe_unused ]] ContextPtr query_context) const override;
ActionLock getActionLock(StorageActionBlockType action_type) override;
void onActionLockRemove(StorageActionBlockType action_type) override;
/// Wait till replication queue's current last entry is processed or till size becomes 0
/// If timeout is exceeded returns false
bool waitForProcessingQueue(UInt64 max_wait_milliseconds, SyncReplicaMode sync_mode, std::unordered_set source_replicas);
/// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK.
void getStatus(ReplicatedTableStatus & res, bool with_zk_fields = true);
using LogEntriesData = std::vector;
void getQueue(LogEntriesData & res, String & replica_name);
std::vector getPartMovesBetweenShardsEntries();
/// Get replica delay relative to current time.
time_t getAbsoluteDelay() const;
/// If the absolute delay is greater than min_relative_delay_to_measure,
/// will also calculate the difference from the unprocessed time of the best replica.
/// NOTE: Will communicate to ZooKeeper to calculate relative delay.
void getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay);
/// Add a part to the queue of parts whose data you want to check in the background thread.
void enqueuePartForCheck(const String & part_name, time_t delay_to_check_seconds = 0);
DataValidationTasksPtr getCheckTaskList(const CheckTaskFilter & check_task_filter, ContextPtr context) override;
std::optional checkDataNext(DataValidationTasksPtr & check_task_list) override;
/// Checks ability to use granularity
bool canUseAdaptiveGranularity() const override;
/// Modify a CREATE TABLE query to make a variant which must be written to a backup.
void adjustCreateQueryForBackup(ASTPtr & create_query) const override;
/// Makes backup entries to backup the data of the storage.
void backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional & partitions) override;
/// Extract data from the backup and put it to the storage.
void restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional & partitions) override;
/** Remove a specific replica from zookeeper.
* returns true if there are no replicas left
*/
static bool dropReplica(zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info,
LoggerPtr logger, MergeTreeSettingsPtr table_settings = nullptr, std::optional * has_metadata_out = nullptr);
bool dropReplica(const String & drop_replica, LoggerPtr logger);
/// Removes table from ZooKeeper after the last replica was dropped
static bool removeTableNodesFromZooKeeper(
zkutil::ZooKeeperPtr zookeeper, const TableZnodeInfo & zookeeper_info2,
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, LoggerPtr logger);
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
/// Checks that fetches are not disabled with action blocker and pool for fetches
/// is not overloaded
bool canExecuteFetch(const ReplicatedMergeTreeLogEntry & entry, String & disable_reason) const;
/// Fetch part only when it stored on shared storage like S3
MutableDataPartPtr executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);
/// Lock part in zookeeper for use shared data in several nodes
void lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock, std::optional hardlinked_files) const override;
void lockSharedData(
const IMergeTreeDataPart & part,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
bool replace_existing_lock,
std::optional hardlinked_files) const;
void getLockSharedDataOps(
const IMergeTreeDataPart & part,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
bool replace_existing_lock,
std::optional hardlinked_files,
Coordination::Requests & requests) const;
zkutil::EphemeralNodeHolderPtr lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const;
/// Unlock shared data part in zookeeper
/// Return true if data unlocked
/// Return false if data is still used by another node
std::pair unlockSharedData(const IMergeTreeDataPart & part) const override;
std::pair
unlockSharedData(const IMergeTreeDataPart & part, const ZooKeeperWithFaultInjectionPtr & zookeeper) const;
/// Unlock shared data part in zookeeper by part id
/// Return true if data unlocked
/// Return false if data is still used by another node
static std::pair unlockSharedDataByID(
String part_id,
const String & table_uuid,
const MergeTreePartInfo & part_info,
const String & replica_name_,
const std::string & disk_type,
const ZooKeeperWithFaultInjectionPtr & zookeeper_,
const MergeTreeSettings & settings,
LoggerPtr logger,
const String & zookeeper_path_old,
MergeTreeDataFormatVersion data_format_version);
/// Fetch part only if some replica has it on shared storage like S3
MutableDataPartPtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
/// Get best replica having this partition on a same type remote disk
String getSharedDataReplica(const IMergeTreeDataPart & part, const DataSourceDescription & data_source_description) const;
const String & getReplicaName() const { return replica_name; }
/// Restores table metadata if ZooKeeper lost it.
/// Used only on restarted readonly replicas (not checked). All active (Active) parts are moved to detached/
/// folder and attached. Parts in all other states are just moved to detached/ folder.
void restoreMetadataInZooKeeper();
/// Get throttler for replicated fetches
ThrottlerPtr getFetchesThrottler() const
{
return replicated_fetches_throttler;
}
/// Get throttler for replicated sends
ThrottlerPtr getSendsThrottler() const
{
return replicated_sends_throttler;
}
bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name);
// Return default or custom zookeeper name for table
const String & getZooKeeperName() const { return zookeeper_info.zookeeper_name; }
const String & getZooKeeperPath() const { return zookeeper_info.path; }
const String & getFullZooKeeperPath() const { return zookeeper_info.full_path; }
// Return table id, common for different replicas
String getTableSharedID() const override;
std::map getUnfinishedMutationCommands() const override;
/// Check if there are new broken disks and enqueue part recovery tasks.
void checkBrokenDisks();
static bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name, const String & table_uuid,
const String & replica_name, const String & zookeeper_path, const ContextPtr & local_context, const zkutil::ZooKeeperPtr & zookeeper);
bool canUseZeroCopyReplication() const;
bool isTableReadOnly () { return is_readonly || isStaticStorage(); }
std::optional hasMetadataInZooKeeper () { return has_metadata_in_zookeeper; }
/// Get a sequential consistent view of current parts.
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
void addLastSentPart(const MergeTreePartInfo & info);
/// Wait required amount of milliseconds to give other replicas a chance to
/// download unique parts from our replica
using ShutdownDeadline = std::chrono::time_point;
void waitForUniquePartsToBeFetchedByOtherReplicas(ShutdownDeadline shutdown_deadline);
private:
std::atomic_bool are_restoring_replica {false};
/// Delete old parts from disk and from ZooKeeper. Returns the number of removed parts
size_t clearOldPartsAndRemoveFromZK();
void clearOldPartsAndRemoveFromZKImpl(zkutil::ZooKeeperPtr zookeeper, DataPartsVector && parts);
template
friend class ReplicatedMergeTreeSinkImpl;
friend class ReplicatedMergeTreePartCheckThread;
friend class ReplicatedMergeTreeCleanupThread;
friend class AsyncBlockIDsCache;
friend class ReplicatedMergeTreeAlterThread;
friend class ReplicatedMergeTreeRestartingThread;
friend class ReplicatedMergeTreeAttachThread;
friend class ReplicatedMergeTreeMergeStrategyPicker;
friend struct ReplicatedMergeTreeLogEntry;
friend class ScopedPartitionMergeLock;
friend class ReplicatedMergeTreeQueue;
friend class PartMovesBetweenShardsOrchestrator;
friend class MergeTreeData;
friend class MergeFromLogEntryTask;
friend class MutateFromLogEntryTask;
friend class ReplicatedMergeMutateTaskBase;
using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker;
using LogEntry = ReplicatedMergeTreeLogEntry;
using LogEntryPtr = LogEntry::Ptr;
using MergeTreeData::MutableDataPartPtr;
zkutil::ZooKeeperPtr current_zookeeper; /// Use only the methods below.
mutable std::mutex current_zookeeper_mutex; /// To recreate the session in the background thread.
zkutil::ZooKeeperPtr tryGetZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeper() const;
/// Get connection from global context and reconnect if needed.
/// NOTE: use it only when table is shut down, in all other cases
/// use getZooKeeper() because it is managed by restarting thread
/// which guarantees that we have only one connected object
/// for table.
zkutil::ZooKeeperPtr getZooKeeperIfTableShutDown() const;
zkutil::ZooKeeperPtr getZooKeeperAndAssertNotReadonly() const;
void setZooKeeper();
String getEndpointName() const;
/// If true, the table is offline and can not be written to it.
/// This flag is managed by RestartingThread.
std::atomic_bool is_readonly {true};
std::atomic_uint32_t readonly_start_time{0};
/// If nullopt - ZooKeeper is not available, so we don't know if there is table metadata.
/// If false - ZooKeeper is available, but there is no table metadata. It's safe to drop table in this case.
std::optional has_metadata_in_zookeeper;
bool is_readonly_metric_set = false;
const TableZnodeInfo zookeeper_info;
const String zookeeper_path; // shorthand for zookeeper_info.path
const String replica_name; // shorthand for zookeeper_info.replica_name
const String replica_path;
/** /replicas/me/is_active.
*/
zkutil::EphemeralNodeHolderPtr replica_is_active_node;
/** Is this replica "leading". The leader replica selects the parts to merge.
* It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders.
*/
std::atomic is_leader {false};
InterserverIOEndpointPtr data_parts_exchange_endpoint;
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
MergeTreeDataMergerMutator merger_mutator;
MergeStrategyPicker merge_strategy_picker;
/** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/).
* In ZK entries in chronological order. Here it is not necessary.
*/
ReplicatedMergeTreeQueue queue;
std::atomic last_queue_update_start_time{0};
std::atomic last_queue_update_finish_time{0};
mutable std::mutex last_queue_update_exception_lock;
String last_queue_update_exception;
String getLastQueueUpdateException() const;
DataPartsExchange::Fetcher fetcher;
/// When activated, replica is initialized and startup() method could exit
Poco::Event startup_event;
/// Do I need to complete background threads (except restarting_thread)?
std::atomic partial_shutdown_called {false};
/// Event that is signalled (and is reset) by the restarting_thread when the ZooKeeper session expires.
Poco::Event partial_shutdown_event {false}; /// Poco::Event::EVENT_MANUALRESET
std::atomic shutdown_called {false};
std::atomic shutdown_prepared_called {false};
std::optional shutdown_deadline;
/// We call flushAndPrepareForShutdown before acquiring DDLGuard, so we can shutdown a table that is being created right now
mutable std::mutex flush_and_shutdown_mutex;
mutable std::mutex last_sent_parts_mutex;
std::condition_variable last_sent_parts_cv;
std::deque last_sent_parts;
/// Threads.
///
/// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
bool queue_update_in_progress = false;
BackgroundSchedulePool::TaskHolder queue_updating_task;
BackgroundSchedulePool::TaskHolder mutations_updating_task;
Coordination::WatchCallbackPtr mutations_watch_callback;
/// A task that selects parts to merge.
BackgroundSchedulePool::TaskHolder merge_selecting_task;
/// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query.
std::mutex merge_selecting_mutex;
UInt64 merge_selecting_sleep_ms;
/// A task that marks finished mutations as done.
BackgroundSchedulePool::TaskHolder mutations_finalizing_task;
/// A thread that removes old parts, log entries, and blocks.
ReplicatedMergeTreeCleanupThread cleanup_thread;
AsyncBlockIDsCache async_block_ids_cache;
/// A thread that checks the data of the parts, as well as the queue of the parts to be checked.
ReplicatedMergeTreePartCheckThread part_check_thread;
/// A thread that processes reconnection to ZooKeeper when the session expires.
ReplicatedMergeTreeRestartingThread restarting_thread;
EventNotifier::HandlerPtr session_expired_callback_handler;
/// A thread that attaches the table using ZooKeeper
std::optional attach_thread;
PartMovesBetweenShardsOrchestrator part_moves_between_shards_orchestrator;
std::atomic initialization_done{false};
/// True if replica was created for existing table with fixed granularity
bool other_replicas_fixed_granularity = false;
/// Throttlers used in DataPartsExchange to lower maximum fetch/sends
/// speed.
ThrottlerPtr replicated_fetches_throttler;
ThrottlerPtr replicated_sends_throttler;
/// Global ID, synced via ZooKeeper between replicas
mutable std::mutex table_shared_id_mutex;
mutable UUID table_shared_id;
std::mutex last_broken_disks_mutex;
std::set last_broken_disks;
std::mutex existing_zero_copy_locks_mutex;
struct ZeroCopyLockDescription
{
std::string replica;
std::shared_ptr> exists;
};
std::unordered_map existing_zero_copy_locks;
static std::optional distributedWriteFromClusterStorage(const std::shared_ptr & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context);
void readLocalImpl(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
size_t max_block_size,
size_t num_streams);
void readLocalSequentialConsistencyImpl(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
size_t max_block_size,
size_t num_streams);
void readParallelReplicasImpl(
QueryPlan & query_plan,
const Names & column_names,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage);
template
void foreachActiveParts(Func && func, bool select_sequential_consistency) const;
/** Creates the minimum set of nodes in ZooKeeper and create first replica.
* Returns true if was created, false if exists.
*/
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
/**
* Creates a replica in ZooKeeper and adds to the queue all that it takes to catch up with the rest of the replicas.
*/
void createReplica(const StorageMetadataPtr & metadata_snapshot);
/** Create nodes in the ZK, which must always be, but which might not exist when older versions of the server are running.
*/
void createNewZooKeeperNodes();
bool checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot, bool strict_check = true);
/// A part of ALTER: apply metadata changes only (data parts are altered separately).
/// Must be called under IStorage::lockForAlter() lock.
void setTableStructure(
const StorageID & table_id, const ContextPtr & local_context,
ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff,
int32_t new_metadata_version);
/** Check that the set of parts corresponds to that in ZK (/replicas/me/parts/).
* If any parts described in ZK are not locally, throw an exception.
* If any local parts are not mentioned in ZK, remove them.
* But if there are too many, throw an exception just in case - it's probably a configuration error.
*/
void checkParts(bool skip_sanity_checks);
bool checkPartsImpl(bool skip_sanity_checks);
/// Synchronize the list of part uuids which are currently pinned. These should be sent to root query executor
/// to be used for deduplication.
void syncPinnedPartUUIDs();
/** Check that the part's checksum is the same as the checksum of the same part on some other replica.
* If no one has such a part, nothing checks.
* Not very reliable: if two replicas add a part almost at the same time, no checks will occur.
* Adds actions to `ops` that add data about the part into ZooKeeper.
* Call under lockForShare.
*/
bool checkPartChecksumsAndAddCommitOps(
const ZooKeeperWithFaultInjectionPtr & zookeeper,
const DataPartPtr & part,
Coordination::Requests & ops,
String part_name,
NameSet & absent_replicas_paths);
String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const;
bool getOpsToCheckPartChecksumsAndCommit(const ZooKeeperWithFaultInjectionPtr & zookeeper, const MutableDataPartPtr & part,
std::optional hardlinked_files, bool replace_zero_copy_lock,
Coordination::Requests & ops, size_t & num_check_ops);
/// Accepts a PreActive part, atomically checks its checksums with ones on other replicas and commit the part
DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const MutableDataPartPtr & part, std::optional hardlinked_files = {}, bool replace_zero_copy_lock=false);
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
void getCommitPartOps(Coordination::Requests & ops, const DataPartPtr & part, const String & block_id_path = "") const;
void getCommitPartOps(Coordination::Requests & ops, const DataPartPtr & part, const std::vector & block_id_paths) const;
/// Adds actions to `ops` that remove a part from ZooKeeper.
/// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes).
void getRemovePartFromZooKeeperOps(const String & part_name, Coordination::Requests & ops, bool has_children);
/// Quickly removes big set of parts from ZooKeeper (using async multi queries)
void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
NameSet * parts_should_be_retried = nullptr);
/// Remove parts from ZooKeeper, throw exception if unable to do so after max_retries.
void removePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5);
void removePartsFromZooKeeperWithRetries(PartsToRemoveFromZooKeeper & parts, size_t max_retries = 5);
void forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(const String & part_name) override;
void paranoidCheckForCoveredPartsInZooKeeperOnStart(const Strings & parts_in_zk, const Strings & parts_to_fetch) const;
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
void removePartAndEnqueueFetch(const String & part_name, bool storage_init);
/// Running jobs from the queue.
/** Execute the action from the queue. Throws an exception if something is wrong.
* Returns whether or not it succeeds. If it did not work, write it to the end of the queue.
*/
bool executeLogEntry(LogEntry & entry);
/// Lookup the part for the entry in the detached/ folder.
/// returns nullptr if the part is corrupt or missing.
MutableDataPartPtr attachPartHelperFoundValidPart(const LogEntry& entry) const;
void executeDropRange(const LogEntry & entry);
/// Execute alter of table metadata. Set replica/metadata and replica/columns
/// nodes in zookeeper and also changes in memory metadata.
/// New metadata and columns values stored in entry.
bool executeMetadataAlter(const LogEntry & entry);
/// Fetch part from other replica (inserted or merged/mutated)
/// NOTE: Attention! First of all tries to find covering part on other replica
/// and set it into entry.actual_new_part_name. After that tries to fetch this new covering part.
/// If fetch was not successful, clears entry.actual_new_part_name.
bool executeFetch(LogEntry & entry, bool need_to_check_missing_part=true);
bool executeReplaceRange(LogEntry & entry);
void executeClonePartFromShard(const LogEntry & entry);
/** Updates the queue.
*/
void queueUpdatingTask();
void mutationsUpdatingTask();
/** Clone data from another replica.
* If replica can not be cloned throw Exception.
*/
void cloneReplica(const String & source_replica, Coordination::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper);
/// Repairs metadata of staled replica. Called from cloneReplica(...)
void cloneMetadataIfNeeded(const String & source_replica, const String & source_path, zkutil::ZooKeeperPtr & zookeeper);
/// Clone replica if it is lost.
void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper);
ReplicatedMergeTreeQueue::SelectedEntryPtr selectQueueEntry();
MergeFromLogEntryTaskPtr getTaskToProcessMergeQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry);
bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry);
/// Start being leader (if not disabled by setting).
/// Since multi-leaders are allowed, it just sets is_leader flag.
void startBeingLeader();
void stopBeingLeader();
/** Selects the parts to merge and writes to the log.
*/
void mergeSelectingTask();
/// Checks if some mutations are done and marks them as done.
void mutationsFinalizingTask();
/** Write the selected parts to merge into the log,
* Call when merge_selecting_mutex is locked.
* Returns false if any part is not in ZK.
*/
enum class CreateMergeEntryResult : uint8_t { Ok, MissingPart, LogUpdated, Other };
CreateMergeEntryResult createLogEntryToMergeParts(
zkutil::ZooKeeperPtr & zookeeper,
const DataPartsVector & parts,
const String & merged_name,
const UUID & merged_part_uuid,
const MergeTreeDataPartFormat & merged_part_format,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ReplicatedMergeTreeLogEntryData * out_log_entry,
int32_t log_version,
MergeType merge_type);
CreateMergeEntryResult createLogEntryToMutatePart(
const IMergeTreeDataPart & part,
const UUID & new_part_uuid,
Int64 mutation_version,
int32_t alter_version,
int32_t log_version);
/** Returns an empty string if no one has a part.
*/
String findReplicaHavingPart(const String & part_name, bool active);
static String findReplicaHavingPart(const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_);
bool checkReplicaHavePart(const String & replica, const String & part_name);
bool checkIfDetachedPartExists(const String & part_name);
bool checkIfDetachedPartitionExists(const String & partition_name);
/** Find replica having specified part or any part that covers it.
* If active = true, consider only active replicas.
* If found, returns replica name and set 'entry->actual_new_part_name' to name of found largest covering part.
* If not found, returns empty string.
*/
String findReplicaHavingCoveringPart(LogEntry & entry, bool active);
bool findReplicaHavingCoveringPart(const String & part_name, bool active);
String findReplicaHavingCoveringPartImplLowLevel(LogEntry * entry, const String & part_name, String & found_part_name, bool active);
static std::set findReplicaUniqueParts(const String & replica_name_, const String & zookeeper_path_, MergeTreeDataFormatVersion format_version_, zkutil::ZooKeeper::Ptr zookeeper_, LoggerPtr log_);
/** Download the specified part from the specified replica.
* If `to_detached`, the part is placed in the `detached` directory.
* If quorum != 0, then the node for tracking the quorum is updated.
* Returns false if part is already fetching right now.
*/
bool fetchPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & source_zookeeper_name,
const String & source_replica_path,
bool to_detached,
size_t quorum,
zkutil::ZooKeeper::Ptr zookeeper_ = nullptr,
bool try_fetch_shared = true);
/** Download the specified part from the specified replica.
* Used for replace local part on the same s3-shared part in hybrid storage.
* Returns false if part is already fetching right now.
*/
MutableDataPartPtr fetchExistsPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & replica_path,
DiskPtr replaced_disk,
String replaced_part_path);
/// Required only to avoid races between executeLogEntry and fetchPartition
std::unordered_set currently_fetching_parts;
std::mutex currently_fetching_parts_mutex;
/// With the quorum being tracked, add a replica to the quorum for the part.
void updateQuorum(const String & part_name, bool is_parallel);
/// Deletes info from quorum/last_part node for particular partition_id.
void cleanLastPartNode(const String & partition_id);
/// Part name is stored in quorum/last_part for corresponding partition_id.
bool partIsLastQuorumPart(const MergeTreePartInfo & part_info) const;
/// Part currently inserting with quorum (node quorum/parallel/part_name exists)
bool partIsInsertingWithParallelQuorum(const MergeTreePartInfo & part_info) const;
/// Creates new block number if block with such block_id does not exist
/// If zookeeper_path_prefix specified then allocate block number on this path
/// (can be used if we want to allocate blocks on other replicas)
std::optional allocateBlockNumber(
const String & partition_id, const zkutil::ZooKeeperPtr & zookeeper,
const String & zookeeper_block_id_path = "", const String & zookeeper_path_prefix = "") const;
template
std::optional allocateBlockNumber(
const String & partition_id,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
const T & zookeeper_block_id_path,
const String & zookeeper_path_prefix = "") const;
/** Wait until all replicas, including this, execute the specified action from the log.
* If replicas are added at the same time, it can not wait the added replica.
*
* Waits for inactive replicas no more than wait_for_inactive_timeout.
* Returns list of inactive replicas that have not executed entry or throws exception.
*
* NOTE: This method must be called without table lock held.
* Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock.
*/
void waitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry,
Int64 wait_for_inactive_timeout, const String & error_context = {});
Strings tryWaitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry,
Int64 wait_for_inactive_timeout);
/** Wait until the specified replica executes the specified action from the log.
* NOTE: See comment about locks above.
*/
bool tryWaitForReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name,
const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0);
/// Depending on settings, do nothing or wait for this replica or all replicas process log entry.
void waitForLogEntryToBeProcessedIfNecessary(const ReplicatedMergeTreeLogEntryData & entry, ContextPtr query_context, const String & error_context = {});
/// Throw an exception if the table is readonly.
void assertNotReadonly() const;
/// Produce an imaginary part info covering all parts in the specified partition (at the call moment).
/// Returns false if the partition doesn't exist yet.
/// Caller must hold delimiting_block_lock until creation of drop/replace entry in log.
/// Otherwise some replica may assign merge which intersects part_info.
bool getFakePartCoveringAllPartsInPartition(const String & partition_id, MergeTreePartInfo & part_info,
std::optional & delimiting_block_lock, bool for_replace_range = false);
/// Check for a node in ZK. If it is, remember this information, and then immediately answer true.
mutable std::unordered_set existing_nodes_cache;
mutable std::mutex existing_nodes_cache_mutex;
bool existsNodeCached(const ZooKeeperWithFaultInjectionPtr & zookeeper, const std::string & path) const;
/// Cancels INSERTs in the block range by removing ephemeral block numbers
void clearLockedBlockNumbersInPartition(zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
void getClearBlocksInPartitionOps(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
void getClearBlocksInPartitionOpsImpl(Coordination::Requests & ops, zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num, const String & blocks_dir_name);
/// Remove block IDs from `blocks/` in ZooKeeper for the given partition ID in the given block number range.
void clearBlocksInPartition(
zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
/// Info about how other replicas can access this one.
ReplicatedMergeTreeAddress getReplicatedMergeTreeAddress() const;
bool addOpsToDropAllPartsInPartition(
zkutil::ZooKeeper & zookeeper, const String & partition_id, bool detach,
Coordination::Requests & ops, std::vector & entries,
std::vector & delimiting_block_locks,
std::vector & log_entry_ops_idx);
void dropAllPartsInPartitions(
zkutil::ZooKeeper & zookeeper, const Strings & partition_ids, std::vector & entries, ContextPtr query_context, bool detach);
LogEntryPtr dropAllPartsInPartition(
zkutil::ZooKeeper & zookeeper, const String & partition_id, ContextPtr query_context, bool detach);
void dropAllPartitionsImpl(const zkutil::ZooKeeperPtr & zookeeper, bool detach, ContextPtr query_context);
void dropPartNoWaitNoThrow(const String & part_name) override;
void dropPart(const String & part_name, bool detach, ContextPtr query_context) override;
// Partition helpers
void dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context) override;
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr query_context) override;
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override;
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override;
void movePartitionToShard(const ASTPtr & partition, bool move_part, const String & to, ContextPtr query_context) override;
CancellationCode killPartMoveToShard(const UUID & task_uuid) override;
void fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & from,
bool fetch_part,
ContextPtr query_context) override;
void forgetPartition(const ASTPtr & partition, ContextPtr query_context) override;
/// NOTE: there are no guarantees for concurrent merges. Dropping part can
/// be concurrently merged into some covering part and dropPart will do
/// nothing. There are some fundamental problems with it. But this is OK
/// because:
///
/// dropPart used in the following cases:
/// 1) Remove empty parts after TTL.
/// 2) Remove parts after move between shards.
/// 3) User queries: ALTER TABLE DROP PART 'part_name'.
///
/// In the first case merge of empty part is even better than DROP. In the
/// second case part UUIDs used to forbid merges for moving parts so there
/// is no problem with concurrent merges. The third case is quite rare and
/// we give very weak guarantee: there will be no active part with this
/// name, but possibly it was merged to some other part.
///
/// NOTE: don't rely on dropPart if you 100% need to remove non-empty part
/// and don't use any explicit locking mechanism for merges.
bool dropPartImpl(zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop);
/// Check granularity of already existing replicated table in zookeeper if it exists
/// return true if it's fixed
bool checkFixedGranularityInZookeeper();
/// Wait for timeout seconds mutation is finished on replicas
void waitMutationToFinishOnReplicas(
const Strings & replicas, const String & mutation_id) const;
MutationsSnapshotPtr getMutationsSnapshot(const IMutationsSnapshot::Params & params) const override;
void startBackgroundMovesIfNeeded() override;
/// Attaches restored parts to the storage.
void attachRestoredParts(MutableDataPartsVector && parts) override;
std::unique_ptr getDefaultSettings() const override;
PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions(
const MutationCommands & commands, ContextPtr query_context, const zkutil::ZooKeeperPtr & zookeeper) const;
static Strings getZeroCopyPartPath(const MergeTreeSettings & settings, const std::string & disk_type, const String & table_uuid,
const String & part_name, const String & zookeeper_path_old);
static void createZeroCopyLockNode(
const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node,
int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});
static void getZeroCopyLockNodeCreateOps(
const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, Coordination::Requests & requests,
int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override;
/// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled.
void createAndStoreFreezeMetadata(DiskPtr disk, DataPartPtr part, String backup_part_path) const override;
// Create table id if needed
void createTableSharedID() const;
bool checkZeroCopyLockExists(const String & part_name, const DiskPtr & disk, String & lock_replica);
void watchZeroCopyLock(const String & part_name, const DiskPtr & disk);
std::optional getZeroCopyPartPath(const String & part_name, const DiskPtr & disk);
/// Create ephemeral lock in zookeeper for part and disk which support zero copy replication.
/// If no connection to zookeeper, shutdown, readonly -- return std::nullopt.
/// If somebody already holding the lock -- return unlocked ZeroCopyLock object (not std::nullopt).
std::optional tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) override;
/// Wait for ephemral lock to disappear. Return true if table shutdown/readonly/timeout exceeded, etc.
/// Or if node actually disappeared.
bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override;
void startupImpl(bool from_attach_thread);
std::vector getZookeeperZeroCopyLockPaths() const;
static void dropZookeeperZeroCopyLockPaths(zkutil::ZooKeeperPtr zookeeper,
std::vector zero_copy_locks_paths, LoggerPtr logger);
struct DataValidationTasks : public IStorage::DataValidationTasksBase
{
explicit DataValidationTasks(DataPartsVector && parts_, std::unique_lock && parts_check_lock_)
: parts_check_lock(std::move(parts_check_lock_)), parts(std::move(parts_)), it(parts.begin())
{}
DataPartPtr next()
{
std::lock_guard lock(mutex);
if (it == parts.end())
return nullptr;
return *(it++);
}
size_t size() const override
{
std::lock_guard lock(mutex);
return std::distance(it, parts.end());
}
std::unique_lock parts_check_lock;
mutable std::mutex mutex;
DataPartsVector parts;
DataPartsVector::const_iterator it;
};
const String TMP_PREFIX_REPLACE_PARTITION_FROM = "tmp_replace_from_";
std::unique_ptr replacePartitionFromImpl(
const Stopwatch & watch,
ProfileEventsScope & profile_events_scope,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData & src_data,
const String & partition_id,
const zkutil::ZooKeeperPtr & zookeeper,
bool replace,
const bool & zero_copy_enabled,
const bool & always_use_copy_instead_of_hardlinks,
const ContextPtr & query_context);
};
String getPartNamePossiblyFake(MergeTreeDataFormatVersion format_version, const MergeTreePartInfo & part_info);
/** There are three places for each part, where it should be
* 1. In the RAM, data_parts, all_data_parts.
* 2. In the filesystem (FS), the directory with the data of the table.
* 3. in ZooKeeper (ZK).
*
* When adding a part, it must be added immediately to these three places.
* This is done like this
* - [FS] first write the part into a temporary directory on the filesystem;
* - [FS] rename the temporary part to the result on the filesystem;
* - [RAM] immediately afterwards add it to the `data_parts`, and remove from `data_parts` any parts covered by this one;
* - [RAM] also set the `Transaction` object, which in case of an exception (in next point),
* rolls back the changes in `data_parts` (from the previous point) back;
* - [ZK] then send a transaction (multi) to add a part to ZooKeeper (and some more actions);
* - [FS, ZK] by the way, removing the covered (old) parts from filesystem, from ZooKeeper and from `all_data_parts`
* is delayed, after a few minutes.
*
* There is no atomicity here.
* It could be possible to achieve atomicity using undo/redo logs and a flag in `DataPart` when it is completely ready.
* But it would be inconvenient - I would have to write undo/redo logs for each `Part` in ZK, and this would increase already large number of interactions.
*
* Instead, we are forced to work in a situation where at any time
* (from another thread, or after server restart), there may be an unfinished transaction.
* (note - for this the part should be in RAM)
* From these cases the most frequent one is when the part is already in the data_parts, but it's not yet in ZooKeeper.
* This case must be distinguished from the case where such a situation is achieved due to some kind of damage to the state.
*
* Do this with the threshold for the time.
* If the part is young enough, its lack in ZooKeeper will be perceived optimistically - as if it just did not have time to be added there
* - as if the transaction has not yet been executed, but will soon be executed.
* And if the part is old, its absence in ZooKeeper will be perceived as an unfinished transaction that needs to be rolled back.
*
* PS. Perhaps it would be better to add a flag to the DataPart that a part is inserted into ZK.
* But here it's too easy to get confused with the consistency of this flag.
*/
/// NOLINTNEXTLINE
#define MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER (5 * 60)
}