#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace DB
{
/*
* Implementation of `IResourceManager` that creates hierarchy of scheduler nodes according to
* workload entities (WORKLOADs and RESOURCEs). It subscribes for updates in IWorkloadEntityStorage and
* creates hierarchy of UnifiedSchedulerNode identical to the hierarchy of WORKLOADs.
* For every RESOURCE an independent hierarchy of scheduler nodes is created.
*
* Manager process updates of WORKLOADs and RESOURCEs: CREATE/DROP/ALTER.
* When a RESOURCE is created (dropped) a corresponding scheduler nodes hierarchy is created (destroyed).
* After DROP RESOURCE parts of hierarchy might be kept alive while at least one query uses it.
*
* Manager is specific to IO only because it create scheduler node hierarchies for RESOURCEs having
* WRITE DISK and/or READ DISK definitions. CPU and memory resources are managed separately.
*
* Classifiers are used (1) to access IO resources and (2) to keep shared ownership of scheduling nodes.
* This allows `ResourceRequest` and `ResourceLink` to hold raw pointers as long as
* `ClassifierPtr` is acquired and held.
*
* === RESOURCE ARCHITECTURE ===
* Let's consider how a single resource is implemented. Every workload is represented by corresponding UnifiedSchedulerNode.
* Every UnifiedSchedulerNode manages its own subtree of ISchedulerNode objects (see details in UnifiedSchedulerNode.h)
* UnifiedSchedulerNode for workload w/o children has a queue, which provide a ResourceLink for consumption.
* Parent of the root workload for a resource is SchedulerRoot with its own scheduler thread.
* So every resource has its dedicated thread for processing of resource request and other events (see EventQueue).
*
* Here is an example of SQL and corresponding hierarchy of scheduler nodes:
* CREATE RESOURCE my_io_resource (...)
* CREATE WORKLOAD all
* CREATE WORKLOAD production PARENT all
* CREATE WORKLOAD development PARENT all
*
* root - SchedulerRoot (with scheduler thread and EventQueue)
* |
* all - UnifiedSchedulerNode
* |
* p0_fair - FairPolicy (part of parent UnifiedSchedulerNode internal structure)
* / \
* production development - UnifiedSchedulerNode
* | |
* queue queue - FifoQueue (part of parent UnifiedSchedulerNode internal structure)
*
* === UPDATING WORKLOADS ===
* Workload may be created, updated or deleted.
* Updating a child of a workload might lead to updating other workloads:
* 1. Workload itself: it's structure depend on settings of children workloads
* (e.g. fifo node of a leaf workload is remove when the first child is added;
* and a fair node is inserted after the first two children are added).
* 2. Other children: for them path to root might be changed (e.g. intermediate priority node is inserted)
*
* === VERSION CONTROL ===
* Versions are created on hierarchy updates and hold ownership of nodes that are used through raw pointers.
* Classifier reference version of every resource it use. Older version reference newer version.
* Here is a diagram explaining version control based on Version objects (for 1 resource):
*
* [nodes] [nodes] [nodes]
* ^ ^ ^
* | | |
* version1 --> version2 -...-> versionN
* ^ ^ ^
* | | |
* old_classifier new_classifier current_version
*
* Previous version should hold reference to a newer version. It is required for proper handling of updates.
* Classifiers that were created for any of old versions may use nodes of newer version due to updateNode().
* It may move a queue to a new position in the hierarchy or create/destroy constraints, thus resource requests
* created by old classifier may reference constraints of newer versions through `request->constraints` which
* is filled during dequeueRequest().
*
* === THREADS ===
* scheduler thread:
* - one thread per resource
* - uses event_queue (per resource) for processing w/o holding mutex for every scheduler node
* - handle resource requests
* - node activations
* - scheduler hierarchy updates
* query thread:
* - multiple independent threads
* - send resource requests
* - acquire and release classifiers (via scheduler event queues)
* control thread:
* - modify workload and resources through subscription
*
* === SYNCHRONIZATION ===
* List of related sync primitives and their roles:
* IOResourceManager::mutex
* - protects resource manager data structures - resource and workloads
* - serialize control thread actions
* IOResourceManager::Resource::scheduler->event_queue
* - serializes scheduler hierarchy events
* - events are created in control and query threads
* - all events are processed by specific scheduler thread
* - hierarchy-wide actions: requests dequeueing, activations propagation and nodes updates.
* - resource version control management
* FifoQueue::mutex and SemaphoreContraint::mutex
* - serializes query and scheduler threads on specific node accesses
* - resource request processing: enqueueRequest(), dequeueRequest() and finishRequest()
*/
class IOResourceManager : public IResourceManager
{
public:
explicit IOResourceManager(IWorkloadEntityStorage & storage_);
~IOResourceManager() override;
void updateConfiguration(const Poco::Util::AbstractConfiguration & config) override;
bool hasResource(const String & resource_name) const override;
ClassifierPtr acquire(const String & workload_name) override;
void forEachNode(VisitorFunc visitor) override;
private:
// Forward declarations
struct NodeInfo;
struct Version;
class Resource;
struct Workload;
class Classifier;
friend struct Workload;
using VersionPtr = std::shared_ptr;
using ResourcePtr = std::shared_ptr;
using WorkloadPtr = std::shared_ptr;
/// Helper for parsing workload AST for a specific resource
struct NodeInfo
{
String name; // Workload name
String parent; // Name of parent workload
SchedulingSettings settings; // Settings specific for a given resource
NodeInfo(const ASTPtr & ast, const String & resource_name);
};
/// Ownership control for scheduler nodes, which could be referenced by raw pointers
struct Version
{
std::vector nodes;
VersionPtr newer_version;
};
/// Holds a thread and hierarchy of unified scheduler nodes for specific RESOURCE
class Resource : public std::enable_shared_from_this, boost::noncopyable
{
public:
explicit Resource(const ASTPtr & resource_entity_);
~Resource();
const String & getName() const { return resource_name; }
/// Hierarchy management
void createNode(const NodeInfo & info);
void deleteNode(const NodeInfo & info);
void updateNode(const NodeInfo & old_info, const NodeInfo & new_info);
/// Updates resource entity
void updateResource(const ASTPtr & new_resource_entity);
/// Updates a classifier to contain a reference for specified workload
std::future attachClassifier(Classifier & classifier, const String & workload_name);
/// Remove classifier reference. This destroys scheduler nodes in proper scheduler thread
std::future detachClassifier(VersionPtr && version);
/// Introspection
void forEachResourceNode(IOResourceManager::VisitorFunc & visitor);
private:
void updateCurrentVersion();
template
void executeInSchedulerThread(Task && task)
{
std::promise promise;
auto future = promise.get_future();
scheduler.event_queue->enqueue([&]
{
try
{
task();
promise.set_value();
}
catch (...)
{
promise.set_exception(std::current_exception());
}
});
future.get(); // Blocks until execution is done in the scheduler thread
}
ASTPtr resource_entity;
const String resource_name;
SchedulerRoot scheduler;
// TODO(serxa): consider using resource_manager->mutex + scheduler thread for updates and mutex only for reading to avoid slow acquire/release of classifier
/// These field should be accessed only by the scheduler thread
std::unordered_map node_for_workload;
UnifiedSchedulerNodePtr root_node;
VersionPtr current_version;
};
struct Workload : boost::noncopyable
{
IOResourceManager * resource_manager;
ASTPtr workload_entity;
Workload(IOResourceManager * resource_manager_, const ASTPtr & workload_entity_);
~Workload();
void updateWorkload(const ASTPtr & new_entity);
String getParent() const;
};
class Classifier : public IClassifier
{
public:
~Classifier() override;
/// Implements IClassifier interface
/// NOTE: It is called from query threads (possibly multiple)
bool has(const String & resource_name) override;
ResourceLink get(const String & resource_name) override;
/// Attaches/detaches a specific resource
/// NOTE: It is called from scheduler threads (possibly multiple)
void attach(const ResourcePtr & resource, const VersionPtr & version, ResourceLink link);
void detach(const ResourcePtr & resource);
private:
IOResourceManager * resource_manager;
std::mutex mutex;
struct Attachment
{
ResourcePtr resource;
VersionPtr version;
ResourceLink link;
};
std::unordered_map attachments; // TSA_GUARDED_BY(mutex);
};
void createOrUpdateWorkload(const String & workload_name, const ASTPtr & ast);
void deleteWorkload(const String & workload_name);
void createOrUpdateResource(const String & resource_name, const ASTPtr & ast);
void deleteResource(const String & resource_name);
// Topological sorting of workloads
void topologicallySortedWorkloadsImpl(Workload * workload, std::unordered_set & visited, std::vector & sorted_workloads);
std::vector topologicallySortedWorkloads();
IWorkloadEntityStorage & storage;
scope_guard subscription;
mutable std::mutex mutex;
std::unordered_map workloads; // TSA_GUARDED_BY(mutex);
std::unordered_map resources; // TSA_GUARDED_BY(mutex);
LoggerPtr log;
};
}