#pragma once
#include
#include
namespace DistributedCache
{
static constexpr auto SERVER_CONFIG_PREFIX = "distributed_cache_server";
static constexpr auto CLIENT_CONFIG_PREFIX = "distributed_cache_client";
static constexpr auto REGISTERED_SERVERS_PATH = "registry";
static constexpr auto OFFSET_ALIGNMENT_PATH = "offset_alignment";
static constexpr auto DEFAULT_ZOOKEEPER_PATH = "/distributed_cache/";
static constexpr auto MAX_VIRTUAL_NODES = 100;
static constexpr auto DEFAULT_OFFSET_ALIGNMENT = 16 * 1024 * 1024;
static constexpr auto DEFAULT_MAX_PACKET_SIZE = DB::DBMS_DEFAULT_BUFFER_SIZE;
static constexpr auto MAX_UNACKED_INFLIGHT_PACKETS = 10;
static constexpr auto ACK_DATA_PACKET_WINDOW = 5;
static constexpr auto DEFAULT_CONNECTION_POOL_SIZE = 15000;
static constexpr auto DEFAULT_CONNECTION_TTL_SEC = 200;
static constexpr auto INITIAL_PROTOCOL_VERSION = 0;
static constexpr auto PROTOCOL_VERSION_WITH_QUERY_ID = 1;
static constexpr auto PROTOCOL_VERSION_WITH_MAX_INFLIGHT_PACKETS = 2;
static constexpr auto PROTOCOL_VERSION_WITH_GCS_TOKEN = 3;
static constexpr UInt32 PROTOCOL_VERSION_WITH_AZURE_AUTH = 4;
static constexpr UInt32 PROTOCOL_VERSION_WITH_TEMPORATY_DATA = 5;
static constexpr UInt32 CURRENT_PROTOCOL_VERSION = PROTOCOL_VERSION_WITH_TEMPORATY_DATA;
namespace Protocol
{
static constexpr auto MIN_VERSION_WITH_QUERY_ID_IN_REQUEST = 1;
/**
* Distributed cache protocol.
*
* Read request:
* Step1: (Client) calculate aligned_offset = aligned(file_offset) - alignment to file_offset.
* The alignment is equal to `offset_alignment`
* (stored on zookeeper for shared access from server and client),
* which allows to guarantee if the client needs offset x,
* then it will go to the server which contains a covering
* file segment for this offset.
* Step2: (Client) calculate hash(x, remote_path, aligned_file_offset) -> h,
* Step3: (Client) find distributed cache server: hash_ring(h) -> s
* Step4: (Client) connect to s:
* Client: `Hello` packet (protocol_version, request_type)
* Server: `Hello` packet (mutual_protocol_version)
* Step5: send general info:
* Client: `ReadInfo` packet (object storage connection info, remote paths, start offset, end offset)
* Step6:
* Server: `ReadRange` packet (includes read range), and send the data.
* Client: `Ok` packet
* in case of error (Client): `EndRequest` packet.
* Step7:
* Client: do Step1 from current file offset and get aligned_offset'.
* If aligned_offset' == aligned_offset, do Step6 again.
* else: go to Step2
*
* Write request:
* Step1: (Client) calculate hash(x, remote_path, file_offset) -> h,
* Step2: (Client) find distributed cache server: hash_ring(h) -> s
* Step3: (Client) connect to s:
* Client: `Hello` packet (protocol_version, request_type)
* Server: `Hello` packet (mutual_protocol_version)
* Step4: send general info:
* Client: `WriteInfo` packet (object storage connection info, remote_path, write range)
* Step5: write one file_segment's range
* Client: `WriteRange` packet (file_segment_start_offset), then process the write.
* Server: `Ok` (after each `Data` packet)
* or `Stop` packet (on error).
* Step6:
* if eof: do Step8
* else: do Step7
* Step7:
* do step1: h' = hash(x, remote_path, file_offset'), where file_offset' - start of the next file segment
* do step2: s' = hash_ring(h')
* if s' == s: do Step5
* else: do Step8 and go to Step3
* Step8:
* Client: `EndRequest` packet
* Server: `Ok` packet
*/
enum RequestType
{
Min = 0,
Read = 1, /// read-through cache
Write = 2, /// write-through cache
Remove = 3, /// drop cache
Show = 4, /// get current cache state
CurrentMetrics = 5, /// get CurrentMetrics
ProfileEvents = 6, /// get ProfileEvents
Max = 8,
};
namespace Client
{
enum Enum
{
Min = 0,
/// A hello packet for handshake between client and server.
Hello = 1,
/// A packet to start a new request: Read, Write, Remove, Show, etc
StartRequest = 2,
/// A packet to identify that the request is finished.
/// E.g. for read request we no longer need receiving data (even if requested read range is not finished);
/// for write request no data will no longer be sent.
EndRequest = 3,
/// A request to continue already started request but with a new information.
/// E.g. for read request - a new read range is needed;
/// for write request - a new write range will be sent.
ContinueRequest = 4,
/// Acknowledgement of `data_packet_ack_window` processed `DataPacket` packets.
AckRequest = 5,
Max = 6,
};
}
namespace Server
{
enum Enum
{
Min = 0,
/// A hello packet for handshake between client and server.
Hello = 1,
/// Identifies that a request was successfully executed.
Ok = 2,
/// Identifies a packet containing an exception message happened on server's size.
Error = 3,
/// Identifies a packet for a Read request.
ReadResult = 4,
/// Identifies a packet for incremental ProfileEvents during Read or Write request.
ProfileCounters = 5,
/// Identifies a packet for a Show request.
ShowResult = 6,
/// Identifies a packet for a ProfileEvents request.
ProfileEvents = 7,
/// Identifies a packet for a Metrics request.
Metrics = 8,
/// Identifies that this server cannot receive any more data for Write request
/// (cache is full or errors during insertion).
Stop = 9,
Max = 11
};
}
}
}