#pragma once #include #include #include namespace DB { /// Also allows to set checkpoint at some position in stream and come back to this position later. /// When next() is called, saves data between checkpoint and current position to own memory and loads next data to sub-buffer /// Sub-buffer should not be accessed directly during the lifetime of peekable buffer (unless /// you reset() the state of peekable buffer after each change of underlying buffer) /// If position() of peekable buffer is explicitly set to some position before checkpoint /// (e.g. by istr.position() = prev_pos), behavior is undefined. class PeekableReadBuffer : public BufferWithOwnMemory { friend class PeekableReadBufferCheckpoint; public: explicit PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ = 0); ~PeekableReadBuffer() override; void prefetch(Priority priority) override { sub_buf->prefetch(priority); } /// Sets checkpoint at current position ALWAYS_INLINE inline void setCheckpoint() { if (checkpoint) { /// Recursive checkpoints. We just remember offset from the /// first checkpoint to the current position. recursive_checkpoints_offsets.push(offsetFromCheckpoint()); return; } checkpoint_in_own_memory = currentlyReadFromOwnMemory(); if (!checkpoint_in_own_memory) { /// Don't need to store unread data anymore peeked_size = 0; } checkpoint.emplace(pos); } /// Forget checkpoint and all data between checkpoint and position ALWAYS_INLINE inline void dropCheckpoint() { assert(checkpoint); if (!recursive_checkpoints_offsets.empty()) { recursive_checkpoints_offsets.pop(); return; } if (!currentlyReadFromOwnMemory()) { /// Don't need to store unread data anymore peeked_size = 0; } checkpoint = std::nullopt; checkpoint_in_own_memory = false; } /// Sets position at checkpoint. /// All pointers (such as this->buffer().end()) may be invalidated void rollbackToCheckpoint(bool drop = false); /// If checkpoint and current position are in different buffers, appends data from sub-buffer to own memory, /// so data between checkpoint and position will be in continuous memory. void makeContinuousMemoryFromCheckpointToPos(); /// Returns true if there unread data extracted from sub-buffer in own memory. /// This data will be lost after destruction of peekable buffer. bool hasUnreadData() const; const ReadBuffer & getSubBuffer() const { return *sub_buf; } private: bool nextImpl() override; void resetImpl(); bool peekNext(); bool useSubbufferOnly() const { return !peeked_size; } bool currentlyReadFromOwnMemory() const { return working_buffer.begin() != sub_buf->buffer().begin(); } bool checkpointInOwnMemory() const { return checkpoint_in_own_memory; } void checkStateCorrect() const; /// Makes possible to append `bytes_to_append` bytes to data in own memory. /// Updates all invalidated pointers and sizes. void resizeOwnMemoryIfNecessary(size_t bytes_to_append); char * getMemoryData() { return use_stack_memory ? stack_memory : memory.data(); } const char * getMemoryData() const { return use_stack_memory ? stack_memory : memory.data(); } size_t offsetFromCheckpointInOwnMemory() const; size_t offsetFromCheckpoint() const; ReadBuffer * sub_buf; size_t peeked_size = 0; std::optional checkpoint = std::nullopt; bool checkpoint_in_own_memory = false; /// To prevent expensive and in some cases unnecessary memory allocations on PeekableReadBuffer /// creation (for example if PeekableReadBuffer is often created or if we need to remember small amount of /// data after checkpoint), at the beginning we will use small amount of memory on stack and allocate /// larger buffer only if reserved memory is not enough. char stack_memory[PADDING_FOR_SIMD]; bool use_stack_memory = true; std::stack recursive_checkpoints_offsets; }; class PeekableReadBufferCheckpoint : boost::noncopyable { PeekableReadBuffer & buf; bool auto_rollback; public: explicit PeekableReadBufferCheckpoint(PeekableReadBuffer & buf_, bool auto_rollback_ = false) : buf(buf_), auto_rollback(auto_rollback_) { buf.setCheckpoint(); } ~PeekableReadBufferCheckpoint() { if (!buf.checkpoint) return; if (auto_rollback) buf.rollbackToCheckpoint(); buf.dropCheckpoint(); } }; }