#pragma once #include "MaterializedPostgreSQLConsumer.h" #include #include #include namespace DB { struct MaterializedPostgreSQLSettings; class StorageMaterializedPostgreSQL; struct SettingChange; class PostgreSQLReplicationHandler : WithContext { friend class TemporaryReplicationSlot; public: using ConsumerPtr = std::shared_ptr; PostgreSQLReplicationHandler( const String & postgres_database_, const String & postgres_table_, const String & clickhouse_database_, const String & clickhouse_uuid_, const postgres::ConnectionInfo & connection_info_, ContextPtr context_, bool is_attach_, const MaterializedPostgreSQLSettings & replication_settings, bool is_materialized_postgresql_database_); /// Activate task to be run from a separate thread: wait until connection is available and call startReplication(). void startup(bool delayed); /// Stop replication without cleanup. void shutdown(); /// Clean up replication: remove publication and replication slots. void shutdownFinal(); /// Add storage pointer to let handler know which tables it needs to keep in sync. void addStorage(const std::string & table_name, StorageMaterializedPostgreSQL * storage); /// Fetch list of tables which are going to be replicated. Used for database engine. std::set fetchRequiredTables(); /// Start replication setup immediately. void startSynchronization(bool throw_on_error); ASTPtr getCreateNestedTableQuery(StorageMaterializedPostgreSQL * storage, const String & table_name); void addTableToReplication(StorageMaterializedPostgreSQL * storage, const String & postgres_table_name); void removeTableFromReplication(const String & postgres_table_name); void setSetting(const SettingChange & setting); Strings getTableAllowedColumns(const std::string & table_name) const; void cleanupFunc(); private: using MaterializedStorages = std::unordered_map; /// Methods to manage Publication. bool isPublicationExist(pqxx::nontransaction & tx); void createPublicationIfNeeded(pqxx::nontransaction & tx); std::set fetchTablesFromPublication(pqxx::work & tx); void dropPublication(pqxx::nontransaction & ntx); void addTableToPublication(pqxx::nontransaction & ntx, const String & table_name); void removeTableFromPublication(pqxx::nontransaction & ntx, const String & table_name); /// Methods to manage Replication Slots. bool isReplicationSlotExist(pqxx::nontransaction & tx, String & start_lsn, bool temporary = false); void createReplicationSlot(pqxx::nontransaction & tx, String & start_lsn, String & snapshot_name, bool temporary = false); void dropReplicationSlot(pqxx::nontransaction & tx, bool temporary = false); /// Methods to manage replication. void checkConnectionAndStart(); void consumerFunc(); ConsumerPtr getConsumer(); StorageInfo loadFromSnapshot(postgres::Connection & connection, std::string & snapshot_name, const String & table_name, StorageMaterializedPostgreSQL * materialized_storage); template PostgreSQLTableStructurePtr fetchTableStructure(T & tx, const String & table_name) const; String doubleQuoteWithSchema(const String & table_name) const; std::pair getSchemaAndTableName(const String & table_name) const; void assertInitialized() const; LoggerPtr log; /// If it is not attach, i.e. a create query, then if publication already exists - always drop it. bool is_attach; String postgres_database; String postgres_schema; String current_database_name; /// Connection string and address for logs. postgres::ConnectionInfo connection_info; /// max_block_size for replication stream. const size_t max_block_size; /// To distinguish whether current replication handler belongs to a MaterializedPostgreSQL database engine or single storage. bool is_materialized_postgresql_database; /// A coma-separated list of tables, which are going to be replicated for database engine. By default, a whole database is replicated. String tables_list; String schema_list; /// Schema can be as a part of table name, i.e. as a clickhouse table it is accessed like db.`schema.table`. /// This is possible to allow replicating tables from multiple schemas in the same MaterializedPostgreSQL database engine. mutable bool schema_as_a_part_of_table_name = false; const bool user_managed_slot; const String user_provided_snapshot; const String replication_slot; const String tmp_replication_slot; const String publication_name; /// Replication consumer. Manages decoding of replication stream and syncing into tables. ConsumerPtr consumer; BackgroundSchedulePool::TaskHolder startup_task; BackgroundSchedulePool::TaskHolder consumer_task; BackgroundSchedulePool::TaskHolder cleanup_task; const UInt64 reschedule_backoff_min_ms; const UInt64 reschedule_backoff_max_ms; const UInt64 reschedule_backoff_factor; UInt64 milliseconds_to_wait; std::atomic stop_synchronization = false; /// MaterializedPostgreSQL tables. Used for managing all operations with its internal nested tables. MaterializedStorages materialized_storages; bool replication_handler_initialized = false; }; }