#pragma once #include "config.h" #if USE_AWS_S3 #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { /** * Buffer to write a data to a S3 object with specified bucket and key. * If data size written to the buffer is less than 'max_single_part_upload_size' write is performed using singlepart upload. * In another case multipart upload is used: * Data is divided on chunks with size greater than 'minimum_upload_part_size'. Last chunk can be less than this threshold. * Each chunk is written as a part to S3. */ class TaskTracker; class WriteBufferFromS3 final : public WriteBufferFromFileBase { public: WriteBufferFromS3( std::shared_ptr client_ptr_, const String & bucket_, const String & key_, size_t buf_size_, const S3::S3RequestSettings & request_settings_, BlobStorageLogWriterPtr blob_log_, std::optional> object_metadata_ = std::nullopt, ThreadPoolCallbackRunnerUnsafe schedule_ = {}, const WriteSettings & write_settings_ = {}); ~WriteBufferFromS3() override; void nextImpl() override; void preFinalize() override; std::string getFileName() const override { return key; } void sync() override { next(); } private: /// Receives response from the server after sending all data. void finalizeImpl() override; void cancelImpl() noexcept override; String getVerboseLogDetails() const; String getShortLogDetails() const; struct PartData; void hidePartialData(); void reallocateFirstBuffer(); void detachBuffer(); void allocateBuffer(); void setFakeBufferWhenPreFinalized(); S3::UploadPartRequest getUploadRequest(size_t part_number, PartData & data); void writePart(PartData && data); void writeMultipartUpload(); void createMultipartUpload(); void completeMultipartUpload(); void abortMultipartUpload(); void tryToAbortMultipartUpload() noexcept; S3::PutObjectRequest getPutRequest(PartData & data); void makeSinglepartUpload(PartData && data); const String bucket; const String key; const S3::S3RequestSettings request_settings; const WriteSettings write_settings; const std::shared_ptr client_ptr; const std::optional> object_metadata; LoggerPtr log = getLogger("WriteBufferFromS3"); LogSeriesLimiterPtr limited_log = std::make_shared(log, 1, 5); BufferAllocationPolicyPtr buffer_allocation_policy; /// Upload in S3 is made in parts. /// We initiate upload, then upload each part and get ETag as a response, and then finalizeImpl() upload with listing all our parts. String multipart_upload_id; std::deque multipart_tags; std::deque multipart_checksums; // if enabled bool multipart_upload_finished = false; /// Track that prefinalize() is called only once bool is_prefinalized = false; /// First fully filled buffer has to be delayed /// There are two ways after: /// First is to call prefinalize/finalize, which leads to single part upload /// Second is to write more data, which leads to multi part upload std::deque detached_part_data; char fake_buffer_when_prefinalized[1] = {}; /// offset() and count() are unstable inside nextImpl /// For example nextImpl changes position hence offset() and count() is changed /// This vars are dedicated to store information about sizes when offset() and count() are unstable size_t total_size = 0; size_t hidden_size = 0; std::unique_ptr task_tracker; BlobStorageLogWriterPtr blob_log; }; } #endif