#pragma once #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int ROCKSDB_ERROR; extern const int LOGICAL_ERROR; } /// The key-value format of rocks db will be /// - key: Int8 (depth of the path) + String (path) /// - value: SizeOf(keeperRocksNodeInfo) (meta of the node) + String (data) template struct RocksDBContainer { using Node = Node_; private: /// MockNode is only use in test to mock `getChildren()` and `getData()` struct MockNode { std::vector children; std::string data; MockNode(size_t children_num, std::string_view data_) : children(std::vector(children_num)), data(data_) { } std::vector getChildren() { return children; } std::string getData() { return data; } }; UInt16 getKeyDepth(const std::string & key) { UInt16 depth = 0; for (size_t i = 0; i < key.size(); i++) { if (key[i] == '/' && i + 1 != key.size()) depth ++; } return depth; } std::string getEncodedKey(const std::string & key, bool child_prefix = false) { WriteBufferFromOwnString key_buffer; UInt16 depth = getKeyDepth(key) + (child_prefix ? 1 : 0); writeIntBinary(depth, key_buffer); writeString(key, key_buffer); return key_buffer.str(); } static std::string_view getDecodedKey(const std::string_view & key) { return std::string_view(key.begin() + 2, key.end()); } struct KVPair { StringRef key; Node value; }; using ValueUpdater = std::function; public: /// This is an iterator wrapping rocksdb iterator and the kv result. struct const_iterator { std::shared_ptr iter; std::shared_ptr pair; const_iterator() = default; explicit const_iterator(std::shared_ptr pair_) : pair(std::move(pair_)) {} explicit const_iterator(rocksdb::Iterator * iter_) : iter(iter_) { updatePairFromIter(); } const KVPair & operator * () const { return *pair; } const KVPair * operator->() const { return pair.get(); } bool operator != (const const_iterator & other) const { return !(*this == other); } bool operator == (const const_iterator & other) const { if (pair == nullptr && other == nullptr) return true; if (pair == nullptr || other == nullptr) return false; return pair->key.toView() == other->key.toView() && iter == other.iter; } bool operator == (std::nullptr_t) const { return iter == nullptr; } bool operator != (std::nullptr_t) const { return iter != nullptr; } explicit operator bool() const { return iter != nullptr; } const_iterator & operator ++() { iter->Next(); updatePairFromIter(); return *this; } private: void updatePairFromIter() { if (iter && iter->Valid()) { auto new_pair = std::make_shared(); new_pair->key = StringRef(getDecodedKey(iter->key().ToStringView())); ReadBufferFromOwnString buffer(iter->value().ToStringView()); typename Node::Meta & meta = new_pair->value; readPODBinary(meta, buffer); readVarUInt(new_pair->value.stats.data_size, buffer); if (new_pair->value.stats.data_size) { new_pair->value.data = std::unique_ptr(new char[new_pair->value.stats.data_size]); buffer.readStrict(new_pair->value.data.get(), new_pair->value.stats.data_size); } pair = new_pair; } else { pair = nullptr; iter = nullptr; } } }; bool initialized = false; const const_iterator end_ptr; void initialize(const KeeperContextPtr & context) { DiskPtr disk = context->getTemporaryRocksDBDisk(); if (disk == nullptr) { throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot get rocksdb disk"); } auto options = context->getRocksDBOptions(); if (options == nullptr) { throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot get rocksdb options"); } rocksdb_dir = disk->getPath(); rocksdb::DB * db; auto status = rocksdb::DB::Open(*options, rocksdb_dir, &db); if (!status.ok()) { throw Exception(ErrorCodes::ROCKSDB_ERROR, "Failed to open rocksdb path at: {}: {}", rocksdb_dir, status.ToString()); } rocksdb_ptr = std::unique_ptr(db); write_options.disableWAL = true; initialized = true; } ~RocksDBContainer() { if (initialized) { rocksdb_ptr->Close(); rocksdb_ptr = nullptr; std::filesystem::remove_all(rocksdb_dir); } } std::vector> getChildren(const std::string & key_, bool read_data = false) { rocksdb::ReadOptions read_options; read_options.total_order_seek = true; std::string key = key_; if (!key.ends_with('/')) key += '/'; size_t len = key.size() + 2; auto iter = std::unique_ptr(rocksdb_ptr->NewIterator(read_options)); std::string encoded_string = getEncodedKey(key, true); rocksdb::Slice prefix(encoded_string); std::vector> result; for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { Node node; ReadBufferFromOwnString buffer(iter->value().ToStringView()); typename Node::Meta & meta = node; /// We do not read data here readPODBinary(meta, buffer); if (read_data) { readVarUInt(meta.stats.data_size, buffer); if (meta.stats.data_size) { node.data = std::unique_ptr(new char[meta.stats.data_size]); buffer.readStrict(node.data.get(), meta.stats.data_size); } } std::string real_key(iter->key().data() + len, iter->key().size() - len); // std::cout << "real key: " << real_key << std::endl; result.emplace_back(std::move(real_key), std::move(node)); } return result; } bool contains(const std::string & path) { const std::string & encoded_key = getEncodedKey(path); std::string buffer_str; rocksdb::Status status = rocksdb_ptr->Get(rocksdb::ReadOptions(), encoded_key, &buffer_str); if (status.IsNotFound()) return false; if (!status.ok()) throw Exception(ErrorCodes::ROCKSDB_ERROR, "Got rocksdb error during executing contains. The error message is {}.", status.ToString()); return true; } const_iterator find(StringRef key_) { /// rocksdb::PinnableSlice slice; const std::string & encoded_key = getEncodedKey(key_.toString()); std::string buffer_str; rocksdb::Status status = rocksdb_ptr->Get(rocksdb::ReadOptions(), encoded_key, &buffer_str); if (status.IsNotFound()) return end(); if (!status.ok()) throw Exception(ErrorCodes::ROCKSDB_ERROR, "Got rocksdb error during executing find. The error message is {}.", status.ToString()); ReadBufferFromOwnString buffer(buffer_str); auto kv = std::make_shared(); kv->key = key_; typename Node::Meta & meta = kv->value; readPODBinary(meta, buffer); /// TODO: Sometimes we don't need to load data. readVarUInt(kv->value.stats.data_size, buffer); if (kv->value.stats.data_size) { kv->value.data = std::unique_ptr(new char[kv->value.stats.data_size]); buffer.readStrict(kv->value.data.get(), kv->value.stats.data_size); } return const_iterator(kv); } MockNode getValue(StringRef key) { auto it = find(key); chassert(it != end()); return MockNode(it->value.stats.numChildren(), it->value.getData()); } const_iterator updateValue(StringRef key_, ValueUpdater updater) { /// rocksdb::PinnableSlice slice; const std::string & key = key_.toString(); const std::string & encoded_key = getEncodedKey(key); std::string buffer_str; rocksdb::Status status = rocksdb_ptr->Get(rocksdb::ReadOptions(), encoded_key, &buffer_str); if (!status.ok()) throw Exception(ErrorCodes::ROCKSDB_ERROR, "Got rocksdb error during find. The error message is {}.", status.ToString()); auto kv = std::make_shared(); kv->key = key_; kv->value.decodeFromString(buffer_str); /// storage->removeDigest(node, key); updater(kv->value); insertOrReplace(key, kv->value); return const_iterator(kv); } bool insert(const std::string & key, Node & value) { std::string value_str; const std::string & encoded_key = getEncodedKey(key); rocksdb::Status status = rocksdb_ptr->Get(rocksdb::ReadOptions(), encoded_key, &value_str); if (status.ok()) { return false; } if (status.IsNotFound()) { status = rocksdb_ptr->Put(write_options, encoded_key, value.getEncodedString()); if (status.ok()) { counter++; return true; } } throw Exception(ErrorCodes::ROCKSDB_ERROR, "Got rocksdb error during insert. The error message is {}.", status.ToString()); } void insertOrReplace(const std::string & key, Node & value) { const std::string & encoded_key = getEncodedKey(key); /// storage->addDigest(value, key); std::string value_str; rocksdb::Status status = rocksdb_ptr->Get(rocksdb::ReadOptions(), encoded_key, &value_str); bool increase_counter = false; if (status.IsNotFound()) increase_counter = true; else if (!status.ok()) throw Exception(ErrorCodes::ROCKSDB_ERROR, "Got rocksdb error during get. The error message is {}.", status.ToString()); status = rocksdb_ptr->Put(write_options, encoded_key, value.getEncodedString()); if (status.ok()) counter += increase_counter; else throw Exception(ErrorCodes::ROCKSDB_ERROR, "Got rocksdb error during insert. The error message is {}.", status.ToString()); } using KeyPtr = std::unique_ptr; /// To be compatible with SnapshotableHashTable, will remove later; KeyPtr allocateKey(size_t size) { return KeyPtr{new char[size]}; } void insertOrReplace(KeyPtr key_data, size_t key_size, Node value) { std::string key(key_data.get(), key_size); insertOrReplace(key, value); } bool erase(const std::string & key) { /// storage->removeDigest(value, key); const std::string & encoded_key = getEncodedKey(key); auto status = rocksdb_ptr->Delete(write_options, encoded_key); if (status.IsNotFound()) return false; if (status.ok()) { counter--; return true; } throw Exception(ErrorCodes::ROCKSDB_ERROR, "Got rocksdb error during erase. The error message is {}.", status.ToString()); } void recalculateDataSize() {} void reverse(size_t size_) {(void)size_;} uint64_t getApproximateDataSize() const { /// use statistics from rocksdb return counter * sizeof(Node); } void enableSnapshotMode(size_t version) { chassert(!snapshot_mode); snapshot_mode = true; snapshot_up_to_version = version; snapshot_size = counter; ++current_version; snapshot = rocksdb_ptr->GetSnapshot(); } void disableSnapshotMode() { chassert(snapshot_mode); snapshot_mode = false; rocksdb_ptr->ReleaseSnapshot(snapshot); } void clearOutdatedNodes() {} std::pair snapshotSizeWithVersion() const { if (!snapshot_mode) return std::make_pair(counter, current_version); return std::make_pair(snapshot_size, current_version); } const_iterator begin() const { rocksdb::ReadOptions read_options; read_options.total_order_seek = true; if (snapshot_mode) read_options.snapshot = snapshot; auto * iter = rocksdb_ptr->NewIterator(read_options); iter->SeekToFirst(); return const_iterator(iter); } const_iterator end() const { return end_ptr; } size_t size() const { return counter; } uint64_t getArenaDataSize() const { return 0; } uint64_t keyArenaSize() const { return 0; } private: String rocksdb_dir; std::unique_ptr rocksdb_ptr; rocksdb::WriteOptions write_options; const rocksdb::Snapshot * snapshot; bool snapshot_mode{false}; size_t current_version{0}; size_t snapshot_up_to_version{0}; size_t snapshot_size{0}; size_t counter{0}; }; }