#pragma once #include #include #include #include #include #include #include #include #include #include #include #include namespace CurrentMetrics { extern const Metric CacheDictionaryUpdateQueueBatches; extern const Metric CacheDictionaryUpdateQueueKeys; } namespace DB { /** This class is passed between update queue and update queue client during update. For simple keys we pass simple keys. For complex keys we pass complex keys columns and requested rows to update. During update cache dictionary should fill requested_keys_to_fetched_columns_during_update_index and fetched_columns_during_update. For complex key to extend lifetime of key complex key arena should be used. */ template class CacheDictionaryUpdateUnit { public: using KeyType = std::conditional_t; /// Constructor for complex keys update request explicit CacheDictionaryUpdateUnit( const Columns & key_columns_, const PaddedPODArray & key_index_to_state_from_storage_, const DictionaryStorageFetchRequest & request_, size_t keys_to_update_size_) : key_columns(key_columns_) , key_index_to_state(key_index_to_state_from_storage_.begin(), key_index_to_state_from_storage_.end()) , request(request_) , keys_to_update_size(keys_to_update_size_) , alive_keys(CurrentMetrics::CacheDictionaryUpdateQueueKeys, keys_to_update_size) {} CacheDictionaryUpdateUnit() : keys_to_update_size(0) , alive_keys(CurrentMetrics::CacheDictionaryUpdateQueueKeys, 0) {} const Columns key_columns; const PaddedPODArray key_index_to_state; const DictionaryStorageFetchRequest request; const size_t keys_to_update_size; HashMap requested_keys_to_fetched_columns_during_update_index; MutableColumns fetched_columns_during_update; /// Complex keys are serialized in this arena DictionaryKeysArenaHolder complex_keys_arena_holder; private: template friend class CacheDictionaryUpdateQueue; mutable std::mutex update_mutex; mutable std::condition_variable is_update_finished; bool is_done{false}; std::exception_ptr current_exception{nullptr}; /// NOLINT /// While UpdateUnit is alive, it is accounted in update_queue size. CurrentMetrics::Increment alive_batch{CurrentMetrics::CacheDictionaryUpdateQueueBatches}; CurrentMetrics::Increment alive_keys; }; template using CacheDictionaryUpdateUnitPtr = std::shared_ptr>; extern template class CacheDictionaryUpdateUnit; extern template class CacheDictionaryUpdateUnit; struct CacheDictionaryUpdateQueueConfiguration { /// Size of update queue const size_t max_update_queue_size; /// Size in thead pool of update queue const size_t max_threads_for_updates; /// Timeout for trying to push update unit into queue const size_t update_queue_push_timeout_milliseconds; /// Timeout during sync waititing of update unit const size_t query_wait_timeout_milliseconds; }; /** Responsibility of this class is to provide asynchronous and synchronous update support for CacheDictionary It is responsibility of CacheDictionary to perform update with UpdateUnit using UpdateFunction. */ template class CacheDictionaryUpdateQueue { public: /// Client of update queue must provide this function in constructor and perform update using update unit. using UpdateFunction = std::function)>; CacheDictionaryUpdateQueue( String dictionary_name_for_logs_, CacheDictionaryUpdateQueueConfiguration configuration_, UpdateFunction && update_func_); ~CacheDictionaryUpdateQueue(); /// Get configuration that was passed to constructor const CacheDictionaryUpdateQueueConfiguration & getConfiguration() const { return configuration; } /// Is queue finished bool isFinished() const { return update_queue.isFinished(); } /// Synchronous wait for update queue to stop void stopAndWait(); /** Try to add update unit into queue. If queue is full and oush cannot be performed in update_queue_push_timeout_milliseconds from configuration an exception will be thrown. If queue already finished an exception will be thrown. */ void tryPushToUpdateQueueOrThrow(CacheDictionaryUpdateUnitPtr & update_unit_ptr); /** Try to synchronously wait for update completion. If exception was passed from update function during update it will be rethrowed. If update will not be finished in query_wait_timeout_milliseconds from configuration an exception will be thrown. If queue already finished an exception will be thrown. */ void waitForCurrentUpdateFinish(CacheDictionaryUpdateUnitPtr & update_unit_ptr) const; private: void updateThreadFunction(); using UpdateQueue = ConcurrentBoundedQueue>; String dictionary_name_for_logs; CacheDictionaryUpdateQueueConfiguration configuration; UpdateFunction update_func; UpdateQueue update_queue; ThreadPool update_pool; }; extern template class CacheDictionaryUpdateQueue; extern template class CacheDictionaryUpdateQueue; }