#pragma once #include #include #include #include #include #include #include #include #include /** Generic interface for ZooKeeper-like services. * Possible examples are: * - ZooKeeper client itself; * - fake ZooKeeper client for testing; * - ZooKeeper emulation layer on top of Etcd, FoundationDB, whatever. */ namespace DB { namespace ErrorCodes { extern const int KEEPER_EXCEPTION; } } namespace Coordination { using namespace DB; struct ACL { static constexpr int32_t Read = 1; static constexpr int32_t Write = 2; static constexpr int32_t Create = 4; static constexpr int32_t Delete = 8; static constexpr int32_t Admin = 16; static constexpr int32_t All = 0x1F; int32_t permissions; String scheme; String id; bool operator<(const ACL & other) const { return std::tuple(permissions, scheme, id) < std::tuple(other.permissions, other.scheme, other.id); } }; using ACLs = std::vector; struct Stat { int64_t czxid{0}; int64_t mzxid{0}; int64_t ctime{0}; int64_t mtime{0}; int32_t version{0}; int32_t cversion{0}; int32_t aversion{0}; int64_t ephemeralOwner{0}; /// NOLINT int32_t dataLength{0}; /// NOLINT int32_t numChildren{0}; /// NOLINT int64_t pzxid{0}; bool operator==(const Stat &) const = default; }; enum class Error : int32_t { ZOK = 0, /** System and server-side errors. * This is never thrown by the server, it shouldn't be used other than * to indicate a range. Specifically error codes greater than this * value, but lesser than ZAPIERROR, are system errors. */ ZSYSTEMERROR = -1, ZRUNTIMEINCONSISTENCY = -2, /// A runtime inconsistency was found ZDATAINCONSISTENCY = -3, /// A data inconsistency was found ZCONNECTIONLOSS = -4, /// Connection to the server has been lost ZMARSHALLINGERROR = -5, /// Error while marshalling or unmarshalling data ZUNIMPLEMENTED = -6, /// Operation is unimplemented ZOPERATIONTIMEOUT = -7, /// Operation timeout ZBADARGUMENTS = -8, /// Invalid arguments ZINVALIDSTATE = -9, /// Invalid zhandle state /** API errors. * This is never thrown by the server, it shouldn't be used other than * to indicate a range. Specifically error codes greater than this * value are API errors. */ ZAPIERROR = -100, ZNONODE = -101, /// Node does not exist ZNOAUTH = -102, /// Not authenticated ZBADVERSION = -103, /// Version conflict ZNOCHILDRENFOREPHEMERALS = -108, /// Ephemeral nodes may not have children ZNODEEXISTS = -110, /// The node already exists ZNOTEMPTY = -111, /// The node has children ZSESSIONEXPIRED = -112, /// The session has been expired by the server ZINVALIDCALLBACK = -113, /// Invalid callback specified ZINVALIDACL = -114, /// Invalid ACL specified ZAUTHFAILED = -115, /// Client authentication failed ZCLOSING = -116, /// ZooKeeper is closing ZNOTHING = -117, /// (not error) no server responses to process ZSESSIONMOVED = -118, /// Session moved to another server, so operation is ignored ZNOTREADONLY = -119, /// State-changing request is passed to read-only server }; /// Network errors and similar. You should reinitialize ZooKeeper session in case of these errors bool isHardwareError(Error code); /// Valid errors sent from the server about database state (like "no node"). Logical and authentication errors (like "bad arguments") are not here. bool isUserError(Error code); const char * errorMessage(Error code); struct Request; using RequestPtr = std::shared_ptr; using Requests = std::vector; struct Request { Request() = default; Request(const Request &) = default; Request & operator=(const Request &) = default; virtual ~Request() = default; virtual String getPath() const = 0; virtual void addRootPath(const String & /* root_path */) {} virtual size_t bytesSize() const { return 0; } }; struct Response; using ResponsePtr = std::shared_ptr; using Responses = std::vector; using ResponseCallback = std::function; struct Response { Error error = Error::ZOK; int64_t zxid = 0; Response() = default; Response(const Response &) = default; Response & operator=(const Response &) = default; virtual ~Response() = default; virtual void removeRootPath(const String & /* root_path */) {} virtual size_t bytesSize() const { return 0; } }; struct WatchResponse : virtual Response { int32_t type = 0; int32_t state = 0; String path; void removeRootPath(const String & root_path) override; size_t bytesSize() const override { return path.size() + sizeof(type) + sizeof(state); } }; using WatchCallback = std::function; /// Passing watch callback as a shared_ptr allows to /// - avoid copying of the callback /// - registering the same callback only once per path using WatchCallbackPtr = std::shared_ptr; struct SetACLRequest : virtual Request { String path; ACLs acls; int32_t version = -1; void addRootPath(const String & root_path) override; String getPath() const override { return path; } size_t bytesSize() const override { return path.size() + sizeof(version) + acls.size() * sizeof(ACL); } }; struct SetACLResponse : virtual Response { Stat stat; size_t bytesSize() const override { return sizeof(Stat); } }; struct GetACLRequest : virtual Request { String path; void addRootPath(const String & root_path) override; String getPath() const override { return path; } size_t bytesSize() const override { return path.size(); } }; struct GetACLResponse : virtual Response { ACLs acl; Stat stat; size_t bytesSize() const override { return sizeof(Stat) + acl.size() * sizeof(ACL); } }; struct CreateRequest : virtual Request { String path; String data; bool is_ephemeral = false; bool is_sequential = false; ACLs acls; /// should it succeed if node already exists bool not_exists = false; void addRootPath(const String & root_path) override; String getPath() const override { return path; } size_t bytesSize() const override { return path.size() + data.size() + sizeof(is_ephemeral) + sizeof(is_sequential) + acls.size() * sizeof(ACL); } }; struct CreateResponse : virtual Response { String path_created; void removeRootPath(const String & root_path) override; size_t bytesSize() const override { return path_created.size(); } }; struct RemoveRequest : virtual Request { String path; int32_t version = -1; void addRootPath(const String & root_path) override; String getPath() const override { return path; } size_t bytesSize() const override { return path.size() + sizeof(version); } }; struct RemoveResponse : virtual Response { }; struct RemoveRecursiveRequest : virtual Request { String path; /// strict limit for number of deleted nodes uint32_t remove_nodes_limit = 1; void addRootPath(const String & root_path) override; String getPath() const override { return path; } size_t bytesSize() const override { return path.size() + sizeof(remove_nodes_limit); } }; struct RemoveRecursiveResponse : virtual Response { }; struct ExistsRequest : virtual Request { String path; void addRootPath(const String & root_path) override; String getPath() const override { return path; } size_t bytesSize() const override { return path.size(); } }; struct ExistsResponse : virtual Response { Stat stat; size_t bytesSize() const override { return sizeof(Stat); } }; struct GetRequest : virtual Request { String path; void addRootPath(const String & root_path) override; String getPath() const override { return path; } size_t bytesSize() const override { return path.size(); } }; struct GetResponse : virtual Response { String data; Stat stat; size_t bytesSize() const override { return data.size() + sizeof(stat); } }; struct SetRequest : virtual Request { String path; String data; int32_t version = -1; void addRootPath(const String & root_path) override; String getPath() const override { return path; } size_t bytesSize() const override { return path.size() + data.size() + sizeof(version); } }; struct SetResponse : virtual Response { Stat stat; size_t bytesSize() const override { return sizeof(stat); } }; enum class ListRequestType : uint8_t { ALL, PERSISTENT_ONLY, EPHEMERAL_ONLY }; struct ListRequest : virtual Request { String path; void addRootPath(const String & root_path) override; String getPath() const override { return path; } size_t bytesSize() const override { return path.size(); } }; struct ListResponse : virtual Response { std::vector names; Stat stat; size_t bytesSize() const override { size_t size = sizeof(stat); for (const auto & name : names) size += name.size(); return size; } }; struct CheckRequest : virtual Request { String path; int32_t version = -1; /// should it check if a node DOES NOT exist bool not_exists = false; void addRootPath(const String & root_path) override; String getPath() const override { return path; } size_t bytesSize() const override { return path.size() + sizeof(version); } }; struct CheckResponse : virtual Response { }; struct SyncRequest : virtual Request { String path; void addRootPath(const String & root_path) override; String getPath() const override { return path; } size_t bytesSize() const override { return path.size(); } }; struct SyncResponse : virtual Response { String path; size_t bytesSize() const override { return path.size(); } }; struct ReconfigRequest : virtual Request { String joining; String leaving; String new_members; int32_t version; String getPath() const final { return keeper_config_path; } size_t bytesSize() const final { return joining.size() + leaving.size() + new_members.size() + sizeof(version); } }; struct ReconfigResponse : virtual Response { String value; Stat stat; size_t bytesSize() const override { return value.size() + sizeof(stat); } }; template struct MultiRequest : virtual Request { std::vector requests; void addRootPath(const String & root_path) override { for (auto & request : requests) request->addRootPath(root_path); } String getPath() const override { return {}; } size_t bytesSize() const override { size_t size = 0; for (const auto & request : requests) size += request->bytesSize(); return size; } }; struct MultiResponse : virtual Response { Responses responses; void removeRootPath(const String & root_path) override; size_t bytesSize() const override { size_t size = 0; for (const auto & response : responses) size += response->bytesSize(); return size; } }; /// This response may be received only as an element of responses in MultiResponse. struct ErrorResponse : virtual Response { }; using CreateCallback = std::function; using RemoveCallback = std::function; using RemoveRecursiveCallback = std::function; using ExistsCallback = std::function; using GetCallback = std::function; using SetCallback = std::function; using ListCallback = std::function; using CheckCallback = std::function; using SyncCallback = std::function; using ReconfigCallback = std::function; using MultiCallback = std::function; /// For watches. enum State { EXPIRED_SESSION = -112, AUTH_FAILED = -113, CONNECTING = 1, ASSOCIATING = 2, CONNECTED = 3, READONLY = 5, NOTCONNECTED = 999 }; enum Event { CREATED = 1, DELETED = 2, CHANGED = 3, CHILD = 4, SESSION = -1, NOTWATCHING = -2 }; class Exception : public DB::Exception { private: /// Delegate constructor, used to minimize repetition; last parameter used for overload resolution. Exception(const std::string & msg, Error code_, int); /// NOLINT Exception(PreformattedMessage && msg, Error code_); /// Message must be a compile-time constant template requires std::is_convertible_v Exception(T && message, Error code_) : DB::Exception(std::forward(message), DB::ErrorCodes::KEEPER_EXCEPTION, /* remote_= */ false), code(code_) { incrementErrorMetrics(code); } static void incrementErrorMetrics(Error code_); public: explicit Exception(Error code_); /// NOLINT Exception(const Exception & exc); template Exception(Error code_, FormatStringHelper fmt, Args &&... args) : DB::Exception(DB::ErrorCodes::KEEPER_EXCEPTION, std::move(fmt), std::forward(args)...) , code(code_) { incrementErrorMetrics(code); } static Exception createDeprecated(const std::string & msg, Error code_) { return Exception(msg, code_, 0); } static Exception fromPath(Error code_, const std::string & path) { return Exception(code_, "Coordination error: {}, path {}", errorMessage(code_), path); } /// Message must be a compile-time constant template requires std::is_convertible_v static Exception fromMessage(Error code_, T && message) { return Exception(std::forward(message), code_); } const char * name() const noexcept override { return "Coordination::Exception"; } const char * className() const noexcept override { return "Coordination::Exception"; } Exception * clone() const override { return new Exception(*this); } const Error code; }; class SimpleFaultInjection { public: SimpleFaultInjection(Float64 probability_before, Float64 probability_after_, const String & description_); ~SimpleFaultInjection() noexcept(false); private: Float64 probability_after = 0; String description; int exceptions_level = 0; }; /** Usage scenario: * - create an object and issue commands; * - you provide callbacks for your commands; callbacks are invoked in internal thread and must be cheap: * for example, just signal a condvar / fulfill a promise. * - you also may provide callbacks for watches; they are also invoked in internal thread and must be cheap. * - whenever you receive exception with ZSESSIONEXPIRED code or method isExpired returns true, * the ZooKeeper instance is no longer usable - you may only destroy it and probably create another. * - whenever session is expired or ZooKeeper instance is destroying, all callbacks are notified with special event. * - data for callbacks must be alive when ZooKeeper instance is alive, so try to avoid capturing references in callbacks, it's error-prone. */ class IKeeper { public: virtual ~IKeeper() = default; /// If expired, you can only destroy the object. All other methods will throw exception. virtual bool isExpired() const = 0; /// Get the current connected node idx. virtual std::optional getConnectedNodeIdx() const = 0; /// Get the current connected host and port. virtual String getConnectedHostPort() const = 0; /// Get the xid of current connection. virtual int64_t getConnectionXid() const = 0; /// Useful to check owner of ephemeral node. virtual int64_t getSessionID() const = 0; virtual String tryGetAvailabilityZone() { return ""; } /// If the method will throw an exception, callbacks won't be called. /// /// After the method is executed successfully, you must wait for callbacks /// (don't destroy callback data before it will be called). /// TODO: The above line is the description of an error-prone interface. It's better /// to replace callbacks with std::future results, so the caller shouldn't think about /// lifetime of the callback data. /// /// All callbacks are executed sequentially (the execution of callbacks is serialized). /// /// If an exception is thrown inside the callback, the session will expire, /// and all other callbacks will be called with "Session expired" error. virtual void create( const String & path, const String & data, bool is_ephemeral, bool is_sequential, const ACLs & acls, CreateCallback callback) = 0; virtual void remove( const String & path, int32_t version, RemoveCallback callback) = 0; virtual void removeRecursive( const String & path, uint32_t remove_nodes_limit, RemoveRecursiveCallback callback) = 0; virtual void exists( const String & path, ExistsCallback callback, WatchCallbackPtr watch) = 0; virtual void get( const String & path, GetCallback callback, WatchCallbackPtr watch) = 0; virtual void set( const String & path, const String & data, int32_t version, SetCallback callback) = 0; virtual void list( const String & path, ListRequestType list_request_type, ListCallback callback, WatchCallbackPtr watch) = 0; virtual void check( const String & path, int32_t version, CheckCallback callback) = 0; virtual void sync( const String & path, SyncCallback callback) = 0; virtual void reconfig( std::string_view joining, std::string_view leaving, std::string_view new_members, int32_t version, ReconfigCallback callback) = 0; virtual void multi( std::span requests, MultiCallback callback) = 0; virtual void multi( const Requests & requests, MultiCallback callback) = 0; virtual bool isFeatureEnabled(DB::KeeperFeatureFlag feature_flag) const = 0; virtual const DB::KeeperFeatureFlags * getKeeperFeatureFlags() const { return nullptr; } /// Expire session and finish all pending requests virtual void finalize(const String & reason) = 0; }; } template <> struct fmt::formatter : fmt::formatter { constexpr auto format(Coordination::Error code, auto & ctx) const { return formatter::format(Coordination::errorMessage(code), ctx); } };