#pragma once #include "config.h" #if USE_LIBPQXX #include #include #include #include #include #include #include namespace DB { template class PostgreSQLSource : public ISource { public: PostgreSQLSource( postgres::ConnectionHolderPtr connection_holder_, const String & query_str_, const Block & sample_block, UInt64 max_block_size_); String getName() const override { return "PostgreSQL"; } ~PostgreSQLSource() override; protected: PostgreSQLSource( std::shared_ptr tx_, const std::string & query_str_, const Block & sample_block, UInt64 max_block_size_, bool auto_commit_); Status prepare() override; Chunk generate() override; void onStart(); void onFinish(); private: void init(const Block & sample_block); const UInt64 max_block_size; bool auto_commit = true; ExternalResultDescription description; bool started = false; bool is_completed = false; postgres::ConnectionHolderPtr connection_holder; std::unordered_map array_info; protected: String query_str; /// tx and stream must be destroyed before connection_holder. std::shared_ptr tx; std::unique_ptr stream; }; /// Passes transaction object into PostgreSQLSource and does not close transaction after read is finished. template class PostgreSQLTransactionSource : public PostgreSQLSource { public: using Base = PostgreSQLSource; PostgreSQLTransactionSource( std::shared_ptr tx_, const std::string & query_str_, const Block & sample_block_, const UInt64 max_block_size_) : PostgreSQLSource(tx_, query_str_, sample_block_, max_block_size_, false) {} }; } #endif