#pragma once #include #include "config.h" #if USE_HIVE #include #include #include #include #include #include #include #include namespace DB { using ThriftHiveMetastoreClientBuilder = std::function()>; class ThriftHiveMetastoreClientPool : public PoolBase { public: using Object = Apache::Hadoop::Hive::ThriftHiveMetastoreClient; using ObjectPtr = std::shared_ptr; using Entry = PoolBase::Entry; explicit ThriftHiveMetastoreClientPool(ThriftHiveMetastoreClientBuilder builder_); protected: ObjectPtr allocObject() override { return builder(); } private: ThriftHiveMetastoreClientBuilder builder; }; class HiveMetastoreClient { public: struct FileInfo { String path; UInt64 last_modify_time; /// In ms size_t size; explicit FileInfo() = default; FileInfo & operator = (const FileInfo &) = default; FileInfo(const FileInfo &) = default; FileInfo(const String & path_, UInt64 last_modify_time_, size_t size_) : path(path_), last_modify_time(last_modify_time_), size(size_) { } }; struct PartitionInfo { Apache::Hadoop::Hive::Partition partition; std::vector files; bool initialized = false; /// If true, files are initialized. explicit PartitionInfo(const Apache::Hadoop::Hive::Partition & partition_): partition(partition_) {} PartitionInfo(PartitionInfo &&) = default; bool haveSameParameters(const Apache::Hadoop::Hive::Partition & other) const; }; class HiveTableMetadata; using HiveTableMetadataPtr = std::shared_ptr; /// Used for speeding up metadata query process. class HiveTableMetadata : boost::noncopyable { public: HiveTableMetadata( const String & db_name_, const String & table_name_, std::shared_ptr table_, const std::vector & partitions_) : db_name(db_name_) , table_name(table_name_) , table(std::move(table_)) , empty_partition_keys(table->partitionKeys.empty()) , hive_files_cache(std::make_shared(10000)) { std::lock_guard lock(mutex); for (const auto & partition : partitions_) partition_infos.emplace(partition.sd.location, PartitionInfo(partition)); } std::shared_ptr getTable() const { return table; } std::vector getPartitions() const; std::vector getFilesByLocation(const HDFSFSPtr & fs, const String & location); HiveFilesCachePtr getHiveFilesCache() const; void updateIfNeeded(const std::vector & partitions); private: bool shouldUpdate(const std::vector & partitions); const String db_name; const String table_name; const std::shared_ptr table; /// Mutex to protect partition_infos. mutable std::mutex mutex; std::map partition_infos; const bool empty_partition_keys; const HiveFilesCachePtr hive_files_cache; LoggerPtr log = getLogger("HiveMetastoreClient"); }; explicit HiveMetastoreClient(ThriftHiveMetastoreClientBuilder builder_) : table_metadata_cache(1000) , client_pool(builder_) { } HiveTableMetadataPtr getTableMetadata(const String & db_name, const String & table_name); // Access hive table information by hive client std::shared_ptr getHiveTable(const String & db_name, const String & table_name); void clearTableMetadata(const String & db_name, const String & table_name); private: static String getCacheKey(const String & db_name, const String & table_name) { return db_name + "." + table_name; } void tryCallHiveClient(std::function func); CacheBase table_metadata_cache; ThriftHiveMetastoreClientPool client_pool; LoggerPtr log = getLogger("HiveMetastoreClient"); }; using HiveMetastoreClientPtr = std::shared_ptr; class HiveMetastoreClientFactory final : private boost::noncopyable { public: static HiveMetastoreClientFactory & instance(); HiveMetastoreClientPtr getOrCreate(const String & name); private: static std::shared_ptr createThriftHiveMetastoreClient(const String & name); std::mutex mutex; std::map clients; }; } #endif