#pragma once #include "config.h" #if USE_HIVE #include #include #include #include #include #include #include #include #include namespace orc { class Statistics; class ColumnStatistics; } namespace DB { struct HiveSettings; namespace ErrorCodes { extern const int NOT_IMPLEMENTED; } class IHiveFile : public WithContext { public: using MinMaxIndex = IMergeTreeDataPart::MinMaxIndex; using MinMaxIndexPtr = std::shared_ptr; enum class FileFormat : uint8_t { RC_FILE, TEXT, LZO_TEXT, SEQUENCE_FILE, AVRO, PARQUET, ORC, }; inline static const String RCFILE_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat"; inline static const String TEXT_INPUT_FORMAT = "org.apache.hadoop.mapred.TextInputFormat"; inline static const String LZO_TEXT_INPUT_FORMAT = "com.hadoop.mapred.DeprecatedLzoTextInputFormat"; inline static const String SEQUENCE_INPUT_FORMAT = "org.apache.hadoop.mapred.SequenceFileInputFormat"; inline static const String PARQUET_INPUT_FORMAT = "com.cloudera.impala.hive.serde.ParquetInputFormat"; inline static const String MR_PARQUET_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; inline static const String AVRO_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"; inline static const String ORC_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; inline static const std::map VALID_HDFS_FORMATS = { {RCFILE_INPUT_FORMAT, FileFormat::RC_FILE}, {TEXT_INPUT_FORMAT, FileFormat::TEXT}, {LZO_TEXT_INPUT_FORMAT, FileFormat::LZO_TEXT}, {SEQUENCE_INPUT_FORMAT, FileFormat::SEQUENCE_FILE}, {PARQUET_INPUT_FORMAT, FileFormat::PARQUET}, {MR_PARQUET_INPUT_FORMAT, FileFormat::PARQUET}, {AVRO_INPUT_FORMAT, FileFormat::AVRO}, {ORC_INPUT_FORMAT, FileFormat::ORC}, }; static bool isFormatClass(const String & format_class) { return VALID_HDFS_FORMATS.contains(format_class); } static FileFormat toFileFormat(const String & format_class) { if (isFormatClass(format_class)) { return VALID_HDFS_FORMATS.find(format_class)->second; } throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported hdfs file format {}", format_class); } IHiveFile( const FieldVector & partition_values_, const String & namenode_url_, const String & path_, UInt64 last_modify_time_, size_t size_, const NamesAndTypesList & index_names_and_types_, const std::shared_ptr & storage_settings_, const ContextPtr & context_) : WithContext(context_) , partition_values(partition_values_) , namenode_url(namenode_url_) , path(path_) , last_modify_time(last_modify_time_) , size(size_) , index_names_and_types(index_names_and_types_) , storage_settings(storage_settings_) { } virtual ~IHiveFile() = default; String getFormatName() const { return String(magic_enum::enum_name(getFormat())); } const String & getPath() const { return path; } UInt64 getLastModTs() const { return last_modify_time; } size_t getSize() const { return size; } std::optional getRows(); const FieldVector & getPartitionValues() const { return partition_values; } const String & getNamenodeUrl() { return namenode_url; } MinMaxIndexPtr getMinMaxIndex() const { return file_minmax_idx; } const std::vector & getSubMinMaxIndexes() const { return split_minmax_idxes; } const std::unordered_set & getSkipSplits() const { return skip_splits; } void setSkipSplits(const std::unordered_set & skip_splits_) { skip_splits = skip_splits_; } String describeMinMaxIndex(const MinMaxIndexPtr & idx) const { if (!idx) return ""; std::vector strs; strs.reserve(index_names_and_types.size()); size_t i = 0; for (const auto & name_type : index_names_and_types) strs.push_back(name_type.name + ":" + name_type.type->getName() + idx->hyperrectangle[i++].toString()); return boost::algorithm::join(strs, "|"); } virtual FileFormat getFormat() const = 0; /// If hive query could use file level minmax index? virtual bool useFileMinMaxIndex() const { return false; } void loadFileMinMaxIndex(); /// If hive query could use sub-file level minmax index? virtual bool useSplitMinMaxIndex() const { return false; } void loadSplitMinMaxIndexes(); protected: virtual void loadFileMinMaxIndexImpl() { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadFileMinMaxIndexImpl is not supported by hive file:{}", getFormatName()); } virtual void loadSplitMinMaxIndexesImpl() { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method loadSplitMinMaxIndexesImpl is not supported by hive file:{}", getFormatName()); } virtual std::optional getRowsImpl() = 0; FieldVector partition_values; String namenode_url; String path; UInt64 last_modify_time; size_t size; std::atomic has_init_rows = false; std::optional rows; NamesAndTypesList index_names_and_types; MinMaxIndexPtr file_minmax_idx; std::atomic file_minmax_idx_loaded{false}; std::vector split_minmax_idxes; std::atomic split_minmax_idxes_loaded{false}; /// Skip splits for this file after applying minmax index (if any) std::unordered_set skip_splits; std::shared_ptr storage_settings; /// IHiveFile would be shared among multi threads, need lock's protection to update min/max indexes. std::mutex mutex; }; using HiveFilePtr = std::shared_ptr; using HiveFiles = std::vector; using HiveFilesCache = CacheBase; using HiveFilesCachePtr = std::shared_ptr; class HiveTextFile : public IHiveFile { public: HiveTextFile( const FieldVector & partition_values_, const String & namenode_url_, const String & path_, UInt64 last_modify_time_, size_t size_, const NamesAndTypesList & index_names_and_types_, const std::shared_ptr & hive_settings_, const ContextPtr & context_) : IHiveFile(partition_values_, namenode_url_, path_, last_modify_time_, size_, index_names_and_types_, hive_settings_, context_) { } FileFormat getFormat() const override { return FileFormat::TEXT; } private: std::optional getRowsImpl() override { return {}; } }; class HiveORCFile : public IHiveFile { public: HiveORCFile( const FieldVector & partition_values_, const String & namenode_url_, const String & path_, UInt64 last_modify_time_, size_t size_, const NamesAndTypesList & index_names_and_types_, const std::shared_ptr & hive_settings_, const ContextPtr & context_) : IHiveFile(partition_values_, namenode_url_, path_, last_modify_time_, size_, index_names_and_types_, hive_settings_, context_) { } FileFormat getFormat() const override { return FileFormat::ORC; } bool useFileMinMaxIndex() const override; bool useSplitMinMaxIndex() const override; private: static Range buildRange(const orc::ColumnStatistics * col_stats); void loadFileMinMaxIndexImpl() override; void loadSplitMinMaxIndexesImpl() override; std::unique_ptr buildMinMaxIndex(const orc::Statistics * statistics); void prepareReader(); void prepareColumnMapping(); std::optional getRowsImpl() override; std::unique_ptr in; std::unique_ptr reader; std::map orc_column_positions; }; class HiveParquetFile : public IHiveFile { public: HiveParquetFile( const FieldVector & partition_values_, const String & namenode_url_, const String & path_, UInt64 last_modify_time_, size_t size_, const NamesAndTypesList & index_names_and_types_, const std::shared_ptr & hive_settings_, const ContextPtr & context_) : IHiveFile(partition_values_, namenode_url_, path_, last_modify_time_, size_, index_names_and_types_, hive_settings_, context_) { } FileFormat getFormat() const override { return FileFormat::PARQUET; } bool useSplitMinMaxIndex() const override; private: void loadSplitMinMaxIndexesImpl() override; std::optional getRowsImpl() override; void prepareReader(); std::unique_ptr in; std::unique_ptr reader; std::map parquet_column_positions; }; } #endif