#pragma once #include #include #include #include #include #include #include namespace DB { class WorkloadEntityStorageBase : public IWorkloadEntityStorage { public: explicit WorkloadEntityStorageBase(ContextPtr global_context_); ASTPtr get(const String & entity_name) const override; ASTPtr tryGet(const String & entity_name) const override; bool has(const String & entity_name) const override; std::vector getAllEntityNames() const override; std::vector getAllEntityNames(WorkloadEntityType entity_type) const override; std::vector> getAllEntities() const override; bool empty() const override; bool storeEntity( const ContextPtr & current_context, WorkloadEntityType entity_type, const String & entity_name, ASTPtr create_entity_query, bool throw_if_exists, bool replace_if_exists, const Settings & settings) override; bool removeEntity( const ContextPtr & current_context, WorkloadEntityType entity_type, const String & entity_name, bool throw_if_not_exists) override; scope_guard getAllEntitiesAndSubscribe( const OnChangedHandler & handler) override; protected: enum class OperationResult { Ok, Failed, Retry }; virtual OperationResult storeEntityImpl( const ContextPtr & current_context, WorkloadEntityType entity_type, const String & entity_name, ASTPtr create_entity_query, bool throw_if_exists, bool replace_if_exists, const Settings & settings) = 0; virtual OperationResult removeEntityImpl( const ContextPtr & current_context, WorkloadEntityType entity_type, const String & entity_name, bool throw_if_not_exists) = 0; std::unique_lock getLock() const; /// Replace current `entities` with `new_entities` and notifies subscribers. /// Note that subscribers will be notified with a sequence of events. /// It is guaranteed that all itermediate states (between every pair of consecutive events) /// will be consistent (all references between entities will be valid) void setAllEntities(const std::vector> & new_entities); /// Serialize `entities` stored in memory plus one optional `change` into multiline string String serializeAllEntities(std::optional change = {}); private: /// Change state in memory void applyEvent(std::unique_lock & lock, const Event & event); /// Notify subscribers about changes describe by vector of events `tx` void unlockAndNotify(std::unique_lock & lock, std::vector tx); /// Return true iff `references` has a path from `source` to `target` bool isIndirectlyReferenced(const String & target, const String & source); /// Adds references that are described by `entity` to `references` void insertReferences(const ASTPtr & entity); /// Removes references that are described by `entity` from `references` void removeReferences(const ASTPtr & entity); /// Returns an ordered vector of `entities` std::vector orderEntities( const std::unordered_map & all_entities, std::optional change = {}); struct Handlers { std::mutex mutex; std::list list; }; /// shared_ptr is here for safety because WorkloadEntityStorageBase can be destroyed before all subscriptions are removed. std::shared_ptr handlers; mutable std::recursive_mutex mutex; std::unordered_map entities; /// Maps entity name into CREATE entity query // Validation std::unordered_map> references; /// Keep track of references between entities. Key is target. Value is set of sources String root_name; /// current root workload name protected: ContextPtr global_context; LoggerPtr log; }; }