#pragma once #include #include #include #include #include #include #include #include #include #include #include namespace ProfileEvents { extern const Event SchedulerIOReadRequests; extern const Event SchedulerIOReadBytes; extern const Event SchedulerIOReadWaitMicroseconds; extern const Event SchedulerIOWriteRequests; extern const Event SchedulerIOWriteBytes; extern const Event SchedulerIOWriteWaitMicroseconds; } namespace CurrentMetrics { extern const Metric SchedulerIOReadScheduled; extern const Metric SchedulerIOWriteScheduled; } namespace DB { namespace ErrorCodes { extern const int RESOURCE_ACCESS_DENIED; } /* * Scoped resource guard. * Waits for resource to be available in constructor and releases resource in destructor * IMPORTANT: multiple resources should not be locked concurrently by a single thread */ class ResourceGuard { public: enum class Lock { Default, /// Locks inside constructor // WARNING: Only for tests. It is not exception-safe because `lock()` must be called after construction. Defer /// Don't lock in constructor, but send request }; struct Metrics { const ProfileEvents::Event requests = ProfileEvents::end(); const ProfileEvents::Event cost = ProfileEvents::end(); const ProfileEvents::Event wait_microseconds = ProfileEvents::end(); const CurrentMetrics::Metric scheduled_count = CurrentMetrics::end(); static const Metrics * getIORead() { static Metrics metrics{ .requests = ProfileEvents::SchedulerIOReadRequests, .cost = ProfileEvents::SchedulerIOReadBytes, .wait_microseconds = ProfileEvents::SchedulerIOReadWaitMicroseconds, .scheduled_count = CurrentMetrics::SchedulerIOReadScheduled }; return &metrics; } static const Metrics * getIOWrite() { static Metrics metrics{ .requests = ProfileEvents::SchedulerIOWriteRequests, .cost = ProfileEvents::SchedulerIOWriteBytes, .wait_microseconds = ProfileEvents::SchedulerIOWriteWaitMicroseconds, .scheduled_count = CurrentMetrics::SchedulerIOWriteScheduled }; return &metrics; } }; enum RequestState { Finished, // Last request has already finished; no concurrent access is possible Enqueued, // Enqueued into the scheduler; thread-safe access is required Dequeued // Dequeued from the scheduler and is in consumption state; no concurrent access is possible }; class Request : public ResourceRequest { public: void enqueue(ResourceCost cost_, ResourceLink link_) { // lock(mutex) is not required because `Finished` request cannot be used by the scheduler thread chassert(state == Finished); state = Enqueued; ResourceRequest::reset(cost_); estimated_cost = link_.queue->enqueueRequestUsingBudget(this); // NOTE: it modifies `cost` and enqueues request } // This function is executed inside scheduler thread and wakes thread issued this `request`. // That thread will continue execution and do real consumption of requested resource synchronously. void execute() override { std::unique_lock lock(mutex); chassert(state == Enqueued); state = Dequeued; dequeued_cv.notify_one(); } // This function is executed inside scheduler thread and wakes thread that issued this `request`. // That thread will throw an exception. void failed(const std::exception_ptr & ptr) override { std::unique_lock lock(mutex); chassert(state == Enqueued); state = Dequeued; exception = ptr; dequeued_cv.notify_one(); } void wait() { CurrentMetrics::Increment scheduled(metrics->scheduled_count); auto timer = CurrentThread::getProfileEvents().timer(metrics->wait_microseconds); std::unique_lock lock(mutex); dequeued_cv.wait(lock, [this] { return state == Dequeued; }); if (exception) throw Exception(ErrorCodes::RESOURCE_ACCESS_DENIED, "Resource request failed: {}", getExceptionMessage(exception, /* with_stacktrace = */ false)); } void finish(ResourceCost real_cost_, ResourceLink link_) { // lock(mutex) is not required because `Dequeued` request cannot be used by the scheduler thread chassert(state == Dequeued); state = Finished; if (estimated_cost != real_cost_) link_.queue->adjustBudget(estimated_cost, real_cost_); ResourceRequest::finish(); ProfileEvents::increment(metrics->requests); ProfileEvents::increment(metrics->cost, real_cost_); } void assertFinished() { // lock(mutex) is not required because `Finished` request cannot be used by the scheduler thread chassert(state == Finished); } static Request & local(const Metrics * metrics) { // Since single thread cannot use more than one resource request simultaneously, // we can reuse thread-local request to avoid allocations static thread_local Request instance; instance.metrics = metrics; return instance; } const Metrics * metrics = nullptr; // Must be initialized before use private: ResourceCost estimated_cost = 0; // Stores initial `cost` value in case budget was used to modify it std::mutex mutex; std::condition_variable dequeued_cv; RequestState state = Finished; std::exception_ptr exception; }; /// Creates pending request for resource; blocks while resource is not available (unless `Lock::Defer`) explicit ResourceGuard(const Metrics * metrics, ResourceLink link_, ResourceCost cost = 1, ResourceGuard::Lock type = ResourceGuard::Lock::Default) : link(link_) , request(Request::local(metrics)) { if (cost == 0) link.reset(); // Ignore zero-cost requests else if (link) { request.enqueue(cost, link); if (type == Lock::Default) request.wait(); } } ~ResourceGuard() { unlock(); } /// Blocks until resource is available void lock() { if (link) request.wait(); } void consume(ResourceCost cost) { real_cost += cost; } /// Report resource consumption has finished void unlock(ResourceCost consumed = 0) { consume(consumed); if (link) { request.finish(real_cost, link); link.reset(); } } ResourceLink link; Request & request; ResourceCost real_cost = 0; }; }