#pragma once #include #include #include #include #include #include #include "config.h" namespace Poco { class Logger; } namespace DB { struct AsyncReadCounters; using AsyncReadCountersPtr = std::shared_ptr; class ReadBufferFromRemoteFSGather; class AsynchronousBoundedReadBuffer : public ReadBufferFromFileBase { public: using Impl = ReadBufferFromFileBase; using ImplPtr = std::unique_ptr; explicit AsynchronousBoundedReadBuffer( ImplPtr impl_, IAsynchronousReader & reader_, const ReadSettings & settings_, size_t buffer_size_, AsyncReadCountersPtr async_read_counters_ = nullptr, FilesystemReadPrefetchesLogPtr prefetches_log_ = nullptr); ~AsynchronousBoundedReadBuffer() override; String getFileName() const override { return impl->getFileName(); } std::optional tryGetFileSize() override { return impl->tryGetFileSize(); } String getInfoForLog() override { return impl->getInfoForLog(); } off_t seek(off_t offset_, int whence) override; void prefetch(Priority priority) override; void setReadUntilPosition(size_t position) override; /// [..., position). void setReadUntilEnd() override { setReadUntilPosition(getFileSize()); } size_t getFileOffsetOfBufferEnd() const override { return file_offset_of_buffer_end; } off_t getPosition() override { return file_offset_of_buffer_end - available() + bytes_to_ignore; } private: const ImplPtr impl; const ReadSettings read_settings; const size_t buffer_size; IAsynchronousReader & reader; size_t file_offset_of_buffer_end = 0; std::optional read_until_position; /// If nonzero then working_buffer is empty. /// If a prefetch is in flight, the prefetch task has been instructed to ignore this many bytes. size_t bytes_to_ignore = 0; Memory<> prefetch_buffer; std::future prefetch_future; const std::string query_id; const std::string current_reader_id; LoggerPtr log; AsyncReadCountersPtr async_read_counters; FilesystemReadPrefetchesLogPtr prefetches_log; struct LastPrefetchInfo { std::chrono::system_clock::time_point submit_time; Priority priority; }; LastPrefetchInfo last_prefetch_info; bool nextImpl() override; void finalize(); bool hasPendingDataToRead(); void appendToPrefetchLog( FilesystemPrefetchState state, int64_t size, const std::unique_ptr & execution_watch); std::future readAsync(char * data, size_t size, Priority priority); IAsynchronousReader::Result readSync(char * data, size_t size); void resetPrefetch(FilesystemPrefetchState state); }; }