#pragma once #include #include #include #include #include #include #include namespace Poco { class Logger; } namespace DB { class NATSConsumer { public: NATSConsumer( std::shared_ptr connection_, StorageNATS & storage_, std::vector & subjects_, const String & subscribe_queue_name, LoggerPtr log_, uint32_t queue_size_, const std::atomic & stopped_); struct MessageData { String message; String subject; }; void subscribe(); void unsubscribe(); size_t subjectsCount() { return subjects.size(); } bool isConsumerStopped() { return stopped; } bool queueEmpty() { return received.empty(); } size_t queueSize() { return received.size(); } auto getSubject() const { return current.subject; } const String & getCurrentMessage() const { return current.message; } /// Return read buffer containing next available message /// or nullptr if there are no messages to process. ReadBufferPtr consume(); private: static void onMsg(natsConnection * nc, natsSubscription * sub, natsMsg * msg, void * consumer); std::shared_ptr connection; StorageNATS & storage; std::vector subscriptions; std::vector subjects; LoggerPtr log; const std::atomic & stopped; bool subscribed = false; String queue_name; String channel_id; ConcurrentBoundedQueue received; MessageData current; }; }