#pragma once #include "config.h" #include #include #include #include #include #include #include #include #include namespace fs = std::filesystem; namespace Poco { class Logger; } namespace DB { class StorageObjectStorageQueue; struct ObjectStorageQueueSettings; struct ObjectStorageQueueTableMetadata; struct StorageInMemoryMetadata; using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; /** * A class for managing ObjectStorageQueue metadata in zookeeper, e.g. * the following folders: * - /processed * - /processing * - /failed * * In case we use buckets for processing for Ordered mode, the structure looks like: * - /buckets//processed -- persistent node, information about last processed file. * - /buckets//lock -- ephemeral node, used for acquiring bucket lock. * - /processing * - /failed * * Depending on ObjectStorageQueue processing mode (ordered or unordered) * we can differently store metadata in /processed node. * * Implements caching of zookeeper metadata for faster responses. * Cached part is located in LocalFileStatuses. * * In case of Unordered mode - if files TTL is enabled or maximum tracked files limit is set * starts a background cleanup thread which is responsible for maintaining them. */ class ObjectStorageQueueMetadata { public: using FileStatus = ObjectStorageQueueIFileMetadata::FileStatus; using FileMetadataPtr = std::shared_ptr; using FileStatusPtr = std::shared_ptr; using FileStatuses = std::unordered_map; using Bucket = size_t; using Processor = std::string; ObjectStorageQueueMetadata( const fs::path & zookeeper_path_, const ObjectStorageQueueTableMetadata & table_metadata_, size_t cleanup_interval_min_ms_, size_t cleanup_interval_max_ms_); ~ObjectStorageQueueMetadata(); static ObjectStorageQueueTableMetadata syncWithKeeper( const fs::path & zookeeper_path, const ObjectStorageQueueSettings & settings, const ColumnsDescription & columns, const std::string & format, const ContextPtr & context, bool is_attach, LoggerPtr log); void shutdown(); FileMetadataPtr getFileMetadata(const std::string & path, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info = {}); FileStatusPtr getFileStatus(const std::string & path); FileStatuses getFileStatuses() const; /// Method of Ordered mode parallel processing. bool useBucketsForProcessing() const; Bucket getBucketForPath(const std::string & path) const; ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr tryAcquireBucket(const Bucket & bucket, const Processor & processor); static size_t getBucketsNum(const ObjectStorageQueueTableMetadata & metadata); void checkTableMetadataEquals(const ObjectStorageQueueMetadata & other); const ObjectStorageQueueTableMetadata & getTableMetadata() const { return table_metadata; } ObjectStorageQueueTableMetadata & getTableMetadata() { return table_metadata; } void alterSettings(const SettingsChanges & changes); private: void cleanupThreadFunc(); void cleanupThreadFuncImpl(); ObjectStorageQueueTableMetadata table_metadata; const ObjectStorageQueueMode mode; const fs::path zookeeper_path; const size_t buckets_num; const size_t cleanup_interval_min_ms, cleanup_interval_max_ms; LoggerPtr log; std::atomic_bool shutdown_called = false; BackgroundSchedulePool::TaskHolder task; class LocalFileStatuses; std::shared_ptr local_file_statuses; }; using ObjectStorageQueueMetadataPtr = std::unique_ptr; }