import pytest from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance( "node1", main_configs=["configs/remote_servers.xml"], ) node2 = cluster.add_instance( "node2", main_configs=["configs/remote_servers.xml"], ) node3 = cluster.add_instance( "node3", main_configs=["configs/remote_servers.xml"], ) config1 = """ node1 9000 node3 9000 true node1 9000 true node2 9000 node3 9000 """ config2 = """ node1 9000 node2 9000 node3 9000 true node1 9000 true node3 9000 """ @pytest.fixture(scope="module") def started_cluster(): try: cluster.start() for _, node in cluster.instances.items(): node.query( f""" create table dist_local (c1 Int32, c2 String) engine=MergeTree() order by c1; create table dist (c1 Int32, c2 String) engine=Distributed(test_cluster, currentDatabase(), dist_local, c1); create table replica_dist_local (c1 Int32, c2 String) engine=MergeTree() order by c1; create table replica_dist (c1 Int32, c2 String) engine=Distributed(test_cluster_with_replication, currentDatabase(), replica_dist_local, c1); """ ) yield cluster finally: cluster.shutdown() def test_distributed_async_insert(started_cluster): node1.query("insert into dist select number,'A' from system.numbers limit 10;") node1.query("system flush distributed dist;") assert int(node3.query("select count() from dist_local where c2 = 'A'")) == 5 assert int(node1.query("select count() from dist_local where c2 = 'A'")) == 5 # Add node2 node1.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2) node1.query("SYSTEM RELOAD CONFIG;") node2.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2) node2.query("SYSTEM RELOAD CONFIG;") node3.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2) node3.query("SYSTEM RELOAD CONFIG;") node1.query("insert into dist select number,'B' from system.numbers limit 12;") node1.query("system flush distributed dist;") assert int(node1.query("select count() from dist_local where c2 = 'B'")) == 4 assert int(node2.query("select count() from dist_local where c2 = 'B'")) == 4 assert int(node3.query("select count() from dist_local where c2 = 'B'")) == 4 # Delete node2 node1.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1) node1.query("SYSTEM RELOAD CONFIG;") node2.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1) node2.query("SYSTEM RELOAD CONFIG;") node3.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1) node3.query("SYSTEM RELOAD CONFIG;") node1.query("insert into dist select number,'C' from system.numbers limit 10;") node1.query("system flush distributed dist;") assert int(node1.query("select count() from dist_local where c2 = 'C'")) == 5 assert int(node2.query("select count() from dist_local where c2 = 'C'")) == 0 assert int(node3.query("select count() from dist_local where c2 = 'C'")) == 5 def test_distributed_async_insert_with_replica(started_cluster): node1.query( "insert into replica_dist select number,'A' from system.numbers limit 10;" ) node1.query("system flush distributed replica_dist;") node2_res = int( node2.query("select count() from replica_dist_local where c2 = 'A'") ) node3_res = int( node3.query("select count() from replica_dist_local where c2 = 'A'") ) assert ( int(node1.query("select count() from replica_dist_local where c2 = 'A'")) == 5 ) assert (node2_res == 0 and node3_res == 5) or (node2_res == 5 and node3_res == 0) # Delete node2 node1.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2) node1.query("SYSTEM RELOAD CONFIG;") node2.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2) node2.query("SYSTEM RELOAD CONFIG;") node3.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config2) node3.query("SYSTEM RELOAD CONFIG;") node1.query( "insert into replica_dist select number,'B' from system.numbers limit 10;" ) node1.query("system flush distributed replica_dist;") assert ( int(node1.query("select count() from replica_dist_local where c2 = 'B'")) == 5 ) assert ( int(node2.query("select count() from replica_dist_local where c2 = 'B'")) == 0 ) assert ( int(node3.query("select count() from replica_dist_local where c2 = 'B'")) == 5 ) # Add node2 node1.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1) node1.query("SYSTEM RELOAD CONFIG;") node2.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1) node2.query("SYSTEM RELOAD CONFIG;") node3.replace_config("/etc/clickhouse-server/config.d/remote_servers.xml", config1) node3.query("SYSTEM RELOAD CONFIG;") node1.query( "insert into replica_dist select number,'C' from system.numbers limit 10;" ) node1.query("system flush distributed replica_dist;") node2_res = int( node2.query("select count() from replica_dist_local where c2 = 'C'") ) node3_res = int( node3.query("select count() from replica_dist_local where c2 = 'C'") ) assert ( int(node1.query("select count() from replica_dist_local where c2 = 'C'")) == 5 ) assert (node2_res == 0 and node3_res == 5) or (node2_res == 5 and node3_res == 0)