#pragma once #include #include namespace DB { /** Has arbitrary non zero number of inputs and arbitrary non zero number of outputs. * All of them have the same structure. * * Pulls data from arbitrary input (whenever it is ready) and pushes it to arbitrary output (whenever it is not full). * Doesn't do any heavy calculations. * Doesn't preserve an order of data. * * Examples: * - union data from multiple inputs to single output - to serialize data that was processed in parallel. * - split data from single input to multiple outputs - to allow further parallel processing. */ class ResizeProcessor final : public IProcessor { public: /// TODO Check that there is non zero number of inputs and outputs. ResizeProcessor(const Block & header, size_t num_inputs, size_t num_outputs) : IProcessor(InputPorts(num_inputs, header), OutputPorts(num_outputs, header)) , current_input(inputs.begin()) , current_output(outputs.begin()) { } String getName() const override { return "Resize"; } Status prepare() override; Status prepare(const PortNumbers &, const PortNumbers &) override; private: InputPorts::iterator current_input; OutputPorts::iterator current_output; size_t num_finished_inputs = 0; size_t num_finished_outputs = 0; std::queue waiting_outputs; std::queue inputs_with_data; bool initialized = false; bool is_reading_started = false; enum class OutputStatus : uint8_t { NotActive, NeedData, Finished, }; enum class InputStatus : uint8_t { NotActive, HasData, Finished, }; struct InputPortWithStatus { InputPort * port; InputStatus status; }; struct OutputPortWithStatus { OutputPort * port; OutputStatus status; }; std::vector input_ports; std::vector output_ports; }; class StrictResizeProcessor : public IProcessor { public: /// TODO Check that there is non zero number of inputs and outputs. StrictResizeProcessor(const Block & header, size_t num_inputs, size_t num_outputs) : IProcessor(InputPorts(num_inputs, header), OutputPorts(num_outputs, header)) , current_input(inputs.begin()) , current_output(outputs.begin()) { } StrictResizeProcessor(InputPorts inputs_, OutputPorts outputs_) : IProcessor(inputs_, outputs_) , current_input(inputs.begin()) , current_output(outputs.begin()) { } String getName() const override { return "StrictResize"; } Status prepare(const PortNumbers &, const PortNumbers &) override; private: InputPorts::iterator current_input; OutputPorts::iterator current_output; size_t num_finished_inputs = 0; size_t num_finished_outputs = 0; std::queue disabled_input_ports; std::queue waiting_outputs; bool initialized = false; enum class OutputStatus : uint8_t { NotActive, NeedData, Finished, }; enum class InputStatus : uint8_t { NotActive, NeedData, Finished, }; struct InputPortWithStatus { InputPort * port; InputStatus status; ssize_t waiting_output; }; struct OutputPortWithStatus { OutputPort * port; OutputStatus status; }; std::vector input_ports; std::vector output_ports; /// This field contained chunks which were read for output which had became finished while reading was happening. /// They will be pushed to any next waiting output. std::vector abandoned_chunks; }; }