#pragma once #include "config.h" #if USE_NURAFT #include #include #include "IServer.h" #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { struct SocketInterruptablePollWrapper; using SocketInterruptablePollWrapperPtr = std::unique_ptr; struct RequestWithResponse { Coordination::ZooKeeperResponsePtr response; Coordination::ZooKeeperRequestPtr request; /// it can be nullptr for some responses }; using ThreadSafeResponseQueue = ConcurrentBoundedQueue; using ThreadSafeResponseQueuePtr = std::shared_ptr; struct LastOp; using LastOpMultiVersion = MultiVersion; using LastOpPtr = LastOpMultiVersion::Version; class KeeperTCPHandler : public Poco::Net::TCPServerConnection { public: static void registerConnection(KeeperTCPHandler * conn); static void unregisterConnection(KeeperTCPHandler * conn); /// dump all connections statistics static void dumpConnections(WriteBufferFromOwnString & buf, bool brief); static void resetConnsStats(); private: static std::mutex conns_mutex; /// all connections static std::unordered_set connections; public: KeeperTCPHandler( const Poco::Util::AbstractConfiguration & config_ref, std::shared_ptr keeper_dispatcher_, Poco::Timespan receive_timeout_, Poco::Timespan send_timeout_, const Poco::Net::StreamSocket & socket_); void run() override; KeeperConnectionStats & getConnectionStats(); void dumpStats(WriteBufferFromOwnString & buf, bool brief); void resetStats(); ~KeeperTCPHandler() override; private: LoggerPtr log; std::shared_ptr keeper_dispatcher; Poco::Timespan operation_timeout; Poco::Timespan min_session_timeout; Poco::Timespan max_session_timeout; Poco::Timespan session_timeout; int64_t session_id{-1}; Stopwatch session_stopwatch; SocketInterruptablePollWrapperPtr poll_wrapper; Poco::Timespan send_timeout; Poco::Timespan receive_timeout; ThreadSafeResponseQueuePtr responses; Coordination::XID close_xid = Coordination::CLOSE_XID; bool use_xid_64 = false; /// Streams for reading/writing from/to client connection socket. std::optional in; std::optional out; std::optional compressed_in; std::optional compressed_out; std::atomic connected{false}; void runImpl(); WriteBuffer & getWriteBuffer(); void flushWriteBuffer(); ReadBuffer & getReadBuffer(); void sendHandshake(bool has_leader, bool & use_compression); Poco::Timespan receiveHandshake(int32_t handshake_length, bool & use_compression); static bool isHandShake(int32_t handshake_length); bool tryExecuteFourLetterWordCmd(int32_t command); std::pair receiveRequest(); void packageSent(); void packageReceived(); void updateStats(Coordination::ZooKeeperResponsePtr & response, const Coordination::ZooKeeperRequestPtr & request); Poco::Timestamp established; using Operations = std::unordered_map; Operations operations; LastOpMultiVersion last_op; KeeperConnectionStats conn_stats; }; } #endif