#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { class Block; struct Settings; class SettingsChanges; struct FormatFactorySettings; struct ReadSettings; class ReadBuffer; class WriteBuffer; class IProcessor; using ProcessorPtr = std::shared_ptr; class IInputFormat; class IOutputFormat; class IRowOutputFormat; struct RowInputFormatParams; class ISchemaReader; class IExternalSchemaReader; using SchemaReaderPtr = std::shared_ptr; using ExternalSchemaReaderPtr = std::shared_ptr; using InputFormatPtr = std::shared_ptr; using OutputFormatPtr = std::shared_ptr; using RowOutputFormatPtr = std::shared_ptr; template struct Memory; FormatSettings getFormatSettings(const ContextPtr & context); FormatSettings getFormatSettings(const ContextPtr & context, const Settings & settings); /** Allows to create an IInputFormat or IOutputFormat by the name of the format. * Note: format and compression are independent things. */ class FormatFactory final : private boost::noncopyable { public: /** Fast reading data from buffer and save result to memory. * Reads at least `min_bytes` and some more until the end of the chunk, depends on the format. * If `max_rows` is non-zero the function also stops after reading the `max_rows` number of rows * (even if the `min_bytes` boundary isn't reached yet). * Used in ParallelParsingInputFormat. */ using FileSegmentationEngine = std::function( ReadBuffer & buf, DB::Memory> & memory, size_t min_bytes, size_t max_rows)>; using FileSegmentationEngineCreator = std::function; private: // On the input side, there are two kinds of formats: // * InputCreator - formats parsed sequentially, e.g. CSV. Almost all formats are like this. // FormatFactory uses ParallelReadBuffer to read in parallel, and ParallelParsingInputFormat // to parse in parallel; the formats mostly don't need to worry about it. // * RandomAccessInputCreator - column-oriented formats that require seeking back and forth in // the file when reading. E.g. Parquet has metadata at the end of the file (needs to be read // before we can parse any data), can skip columns by seeking in the file, and often reads // many short byte ranges from the file. ParallelReadBuffer and ParallelParsingInputFormat // are a poor fit. Instead, the format implementation is in charge of parallelizing both // reading and parsing. using InputCreator = std::function; // Incompatible with FileSegmentationEngine. using RandomAccessInputCreator = std::function; using OutputCreator = std::function; /// Some input formats can have non trivial readPrefix() and readSuffix(), /// so in some cases there is no possibility to use parallel parsing. /// The checker should return true if parallel parsing should be disabled. using NonTrivialPrefixAndSuffixChecker = std::function; /// Some formats can support append depending on settings. /// The checker should return true if format support append. using AppendSupportChecker = std::function; using SchemaReaderCreator = std::function; using ExternalSchemaReaderCreator = std::function; /// Some formats can extract different schemas from the same source depending on /// some settings. To process this case in schema cache we should add some additional /// information to a cache key. This getter should return some string with information /// about such settings. For example, for Protobuf format it's the path to the schema /// and the name of the message. using AdditionalInfoForSchemaCacheGetter = std::function; /// Some formats can support reading subset of columns depending on settings. /// The checker should return true if format support append. using SubsetOfColumnsSupportChecker = std::function; struct Creators { String name; InputCreator input_creator; RandomAccessInputCreator random_access_input_creator; OutputCreator output_creator; FileSegmentationEngineCreator file_segmentation_engine_creator; SchemaReaderCreator schema_reader_creator; ExternalSchemaReaderCreator external_schema_reader_creator; bool supports_parallel_formatting{false}; bool prefers_large_blocks{false}; NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker; AppendSupportChecker append_support_checker; AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter; SubsetOfColumnsSupportChecker subset_of_columns_support_checker; }; using FormatsDictionary = std::unordered_map; using FileExtensionFormats = std::unordered_map; public: static FormatFactory & instance(); /// This has two tricks up its sleeve: /// * Parallel reading. /// To enable it, make sure `buf` is a SeekableReadBuffer implementing readBigAt(). /// * Parallel parsing. /// `buf` must outlive the returned IInputFormat. InputFormatPtr getInput( const String & name, ReadBuffer & buf, const Block & sample, const ContextPtr & context, UInt64 max_block_size, const std::optional & format_settings = std::nullopt, std::optional max_parsing_threads = std::nullopt, std::optional max_download_threads = std::nullopt, // affects things like buffer sizes and parallel reading bool is_remote_fs = false, // allows to do: buf -> parallel read -> decompression, // because parallel read after decompression is not possible CompressionMethod compression = CompressionMethod::None, bool need_only_count = false) const; /// Checks all preconditions. Returns ordinary format if parallel formatting cannot be done. OutputFormatPtr getOutputFormatParallelIfPossible( const String & name, WriteBuffer & buf, const Block & sample, const ContextPtr & context, const std::optional & format_settings = std::nullopt) const; OutputFormatPtr getOutputFormat( const String & name, WriteBuffer & buf, const Block & sample, const ContextPtr & context, const std::optional & _format_settings = std::nullopt) const; String getContentType( const String & name, const ContextPtr & context, const std::optional & format_settings = std::nullopt) const; SchemaReaderPtr getSchemaReader( const String & name, ReadBuffer & buf, const ContextPtr & context, const std::optional & format_settings = std::nullopt) const; ExternalSchemaReaderPtr getExternalSchemaReader( const String & name, const ContextPtr & context, const std::optional & format_settings = std::nullopt) const; void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine); void registerFileSegmentationEngineCreator(const String & name, FileSegmentationEngineCreator file_segmentation_engine_creator); void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker); void registerAppendSupportChecker(const String & name, AppendSupportChecker append_support_checker); /// If format always doesn't support append, you can use this method instead of /// registerAppendSupportChecker with append_support_checker that always returns true. void markFormatHasNoAppendSupport(const String & name); bool checkIfFormatSupportAppend(const String & name, const ContextPtr & context, const std::optional & format_settings_ = std::nullopt); /// Register format by its name. void registerInputFormat(const String & name, InputCreator input_creator); void registerRandomAccessInputFormat(const String & name, RandomAccessInputCreator input_creator); void registerOutputFormat(const String & name, OutputCreator output_creator); /// Register file extension for format void registerFileExtension(const String & extension, const String & format_name); String getFormatFromFileName(String file_name); std::optional tryGetFormatFromFileName(String file_name); String getFormatFromFileDescriptor(int fd); std::optional tryGetFormatFromFileDescriptor(int fd); /// Register schema readers for format its name. void registerSchemaReader(const String & name, SchemaReaderCreator schema_reader_creator); void registerExternalSchemaReader(const String & name, ExternalSchemaReaderCreator external_schema_reader_creator); void markOutputFormatSupportsParallelFormatting(const String & name); void markOutputFormatPrefersLargeBlocks(const String & name); void markFormatSupportsSubsetOfColumns(const String & name); void registerSubsetOfColumnsSupportChecker(const String & name, SubsetOfColumnsSupportChecker subset_of_columns_support_checker); bool checkIfFormatSupportsSubsetOfColumns(const String & name, const ContextPtr & context, const std::optional & format_settings_ = std::nullopt) const; bool checkIfFormatHasSchemaReader(const String & name) const; bool checkIfFormatHasExternalSchemaReader(const String & name) const; bool checkIfFormatHasAnySchemaReader(const String & name) const; bool checkIfOutputFormatPrefersLargeBlocks(const String & name) const; bool checkParallelizeOutputAfterReading(const String & name, const ContextPtr & context) const; void registerAdditionalInfoForSchemaCacheGetter(const String & name, AdditionalInfoForSchemaCacheGetter additional_info_for_schema_cache_getter); String getAdditionalInfoForSchemaCache(const String & name, const ContextPtr & context, const std::optional & format_settings_ = std::nullopt); const FormatsDictionary & getAllFormats() const { return dict; } std::vector getAllInputFormats() const; bool isInputFormat(const String & name) const; bool isOutputFormat(const String & name) const; /// Check that format with specified name exists and throw an exception otherwise. void checkFormatName(const String & name) const; bool exists(const String & name) const; private: FormatsDictionary dict; FileExtensionFormats file_extension_formats; const Creators & getCreators(const String & name) const; Creators & getOrCreateCreators(const String & name); // Creates a ReadBuffer to give to an input format. Returns nullptr if we should use `buf` directly. std::unique_ptr wrapReadBufferIfNeeded( ReadBuffer & buf, CompressionMethod compression, const Creators & creators, const FormatSettings & format_settings, const Settings & settings, bool is_remote_fs, size_t max_download_threads) const; }; }