mirror of
https://github.com/20kaushik02/real-time-traffic-analysis-clickhouse.git
synced 2026-01-25 08:04:04 +00:00
cc map, full scale data testing
This commit is contained in:
@@ -10,13 +10,14 @@ if __name__ == "__main__":
|
||||
|
||||
# extracting details of each running container in json format
|
||||
try:
|
||||
all_services = subprocess.check_output(["docker","ps","--format","json"],text=True).split('\n')[:-1]
|
||||
all_services = subprocess.check_output(["sudo", "docker","service","ls","--format","json"],text=True).split('\n')[:-1]
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Command failed with return code {e.returncode}")
|
||||
|
||||
all_services = [json.loads(s) for s in all_services]
|
||||
# extracting the name, removing the custom id from it and storing it in a list
|
||||
all_service_names = [service['Names'].split('.')[0] for service in all_services if re.findall(r'clickhouse-server',service['Names'])]
|
||||
# all_service_names = [service['Names'].split('.')[0] for service in all_services if re.findall(r'clickhouse-server',service['Names'])]
|
||||
all_service_names = [service['Name'] for service in all_services if re.findall(r'clickhouse-server',service['Name'])]
|
||||
# extracting only 'server1','server2'...
|
||||
all_service_names = [ name.split('-')[-1] for name in all_service_names]
|
||||
|
||||
@@ -41,7 +42,7 @@ if __name__ == "__main__":
|
||||
</shard>
|
||||
'''
|
||||
# extracting existing remote-servers file
|
||||
with open('../node1-config/remote-servers.xml','r') as f:
|
||||
with open('../clickhouse/node1-config/remote-servers.xml','r') as f:
|
||||
curr_remote_servers_xml = ET.parse(f)
|
||||
|
||||
cluster_root = curr_remote_servers_xml.find('.//cluster_1S_2R')
|
||||
@@ -49,20 +50,20 @@ if __name__ == "__main__":
|
||||
cluster_root.append(new_shard_xml)
|
||||
|
||||
# creating folders for new servers that contain the configuration files
|
||||
os.makedirs(f'../node{curr_num_servers+1}-config',exist_ok=True)
|
||||
os.makedirs(f'../node{curr_num_servers+2}-config',exist_ok=True)
|
||||
os.makedirs(f'../clickhouse/node{curr_num_servers+1}-config',exist_ok=True)
|
||||
os.makedirs(f'../clickhouse/node{curr_num_servers+2}-config',exist_ok=True)
|
||||
|
||||
# adding the new shard to each remote-servers file
|
||||
for i in range(1,curr_num_servers+3):
|
||||
output_path = f'../node{i}-config/remote-servers.xml'
|
||||
output_path = f'../clickhouse/node{i}-config/remote-servers.xml'
|
||||
curr_remote_servers_xml.write(output_path, encoding='utf-8', xml_declaration=False)
|
||||
|
||||
env = Environment(loader=FileSystemLoader('../jinja-templates'))
|
||||
env = Environment(loader=FileSystemLoader('../clickhouse/jinja-templates'))
|
||||
service_template = env.get_template('service.yml.jinja')
|
||||
volume_template = env.get_template('volume.yml.jinja')
|
||||
|
||||
# loading existing docker-compose file
|
||||
with open('../docker-compose.yaml','r') as f:
|
||||
with open('../clickhouse/docker-compose.yaml','r') as f:
|
||||
compose_f = yaml.safe_load(f)
|
||||
|
||||
# rendering the new service
|
||||
@@ -79,7 +80,7 @@ if __name__ == "__main__":
|
||||
compose_f['volumes'].update(new_volume2)
|
||||
|
||||
if compose_f:
|
||||
with open('../docker-compose.yaml','w') as yamlfile:
|
||||
with open('../clickhouse/docker-compose.yaml','w') as yamlfile:
|
||||
yaml.safe_dump(compose_f, yamlfile)
|
||||
|
||||
config_template = env.get_template('config.xml.jinja')
|
||||
@@ -89,18 +90,18 @@ if __name__ == "__main__":
|
||||
|
||||
for i in range(1,3):
|
||||
config_content = config_template.render(node_num=curr_num_servers+i)
|
||||
with open(f'../node{curr_num_servers + i}-config/config.xml','w') as f1:
|
||||
with open(f'../clickhouse/node{curr_num_servers + i}-config/config.xml','w') as f1:
|
||||
f1.write(config_content)
|
||||
|
||||
macros_content = macros_template.render(shard_num="0"+str(int(curr_num_shards+1)),replica_num=i)
|
||||
with open(f'../node{curr_num_servers + i}-config/macros.xml','w') as f2:
|
||||
with open(f'../clickhouse/node{curr_num_servers + i}-config/macros.xml','w') as f2:
|
||||
f2.write(macros_content)
|
||||
|
||||
use_keeper_content = use_keeper_template.render()
|
||||
with open(f'../node{curr_num_servers + i}-config/use-keeper.xml','w') as f3:
|
||||
with open(f'../clickhouse/node{curr_num_servers + i}-config/use-keeper.xml','w') as f3:
|
||||
f3.write(use_keeper_content)
|
||||
|
||||
storage_policy_content = storage_policy_template.render(server_num=curr_num_servers+i)
|
||||
with open(f'../node{curr_num_servers + i}-config/storage-policy.xml','w') as f4:
|
||||
with open(f'../clickhouse/node{curr_num_servers + i}-config/storage-policy.xml','w') as f4:
|
||||
f4.write(storage_policy_content)
|
||||
|
||||
|
||||
@@ -7,28 +7,34 @@ import time
|
||||
def check_util_exec():
|
||||
# extracting details of each running container in json format
|
||||
try:
|
||||
all_services = subprocess.check_output(["docker","stats","--no-stream","--format","json"],text=True).split('\n')[:-1]
|
||||
all_services = subprocess.check_output(["sudo", "docker","stats","--no-stream","--format","json"],text=True).split('\n')[:-1]
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"Command failed with return code {e.returncode}")
|
||||
|
||||
all_services = [json.loads(s) for s in all_services]
|
||||
|
||||
resource_util_exceed_flag = True # Flag to check if all of the containers have exceeded 80% memory utilization
|
||||
resource_util_exceed_flag = True # Flag to check if all of the containers have exceeded 80% memory utilization
|
||||
for service in all_services:
|
||||
if re.findall(r'clickhouse-server',service['Name']):
|
||||
if float(service['MemPerc'][:-1]) < 80:
|
||||
if float(service['MemPerc'][:-1]) < 60:
|
||||
resource_util_exceed_flag = False
|
||||
|
||||
if resource_util_exceed_flag:
|
||||
process = subprocess.Popen(['python3','update_compose.py'],text=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||||
process = subprocess.Popen(['python3','../clickhouse/update_config_scripts/update_compose.py'],text=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||||
stdout, stderr = process.communicate() # Wait for the process to finish and capture output
|
||||
print("Standard Output:", stdout)
|
||||
print("Standard Error:", stderr)
|
||||
# try:
|
||||
# all_services = subprocess.check_output(["sudo", "docker","stats","--no-stream","--format","json"],text=True).split('\n')[:-1]
|
||||
# except subprocess.CalledProcessError as e:
|
||||
# print(f"Command failed with return code {e.returncode}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
schedule.every(30).seconds.do(check_util_exec)
|
||||
# schedule.every(30).seconds.do(check_util_exec)
|
||||
# while True:
|
||||
# schedule.run_pending()
|
||||
# time.sleep(1)
|
||||
while True:
|
||||
schedule.run_pending()
|
||||
time.sleep(1)
|
||||
|
||||
check_util_exec()
|
||||
time.sleep(30)
|
||||
|
||||
@@ -61,6 +61,7 @@ services:
|
||||
- ../clickhouse/node1-config/:/etc/clickhouse-server/config.d/
|
||||
- ../clickhouse/node-entrypoints/main:/docker-entrypoint-initdb.d
|
||||
- ../preprocessing/geoip.csv:/var/lib/clickhouse/user_files/csv/ip_region_map.csv
|
||||
- ../preprocessing/geoip_cc.csv:/var/lib/clickhouse/user_files/csv/ip_region_cc_map.csv
|
||||
- clickhouse_server1_data:/var/lib/clickhouse
|
||||
- clickhouse_server1_TTL:/clickhouse_data/server1
|
||||
networks:
|
||||
|
||||
@@ -19,9 +19,10 @@ SETTINGS storage_policy = 'hot_cold';
|
||||
CREATE TABLE ip_region_map (
|
||||
ip_range_start IPv4,
|
||||
ip_range_end IPv4,
|
||||
region LowCardinality(String),
|
||||
ip_range_cidr String MATERIALIZED IPv4RangeToCIDRString(ip_range_start, ip_range_end),
|
||||
INDEX region_idx region TYPE bloom_filter
|
||||
country_code LowCardinality(String),
|
||||
country LowCardinality(String),
|
||||
INDEX country_idx country TYPE bloom_filter
|
||||
) ENGINE = ReplicatedMergeTree(
|
||||
'/clickhouse/tables/{shard}/ip_region_map',
|
||||
'{replica}'
|
||||
@@ -29,7 +30,7 @@ CREATE TABLE ip_region_map (
|
||||
ORDER BY ip_range_start;
|
||||
|
||||
CREATE DICTIONARY ip_region_dict
|
||||
(ip_range_cidr String, region String)
|
||||
(ip_range_cidr String, country_code String, country String)
|
||||
PRIMARY KEY ip_range_cidr
|
||||
SOURCE(CLICKHOUSE(TABLE 'ip_region_map'))
|
||||
LAYOUT(ip_trie)
|
||||
|
||||
@@ -19,9 +19,10 @@ SETTINGS storage_policy = 'hot_cold';
|
||||
CREATE TABLE ip_region_map (
|
||||
ip_range_start IPv4,
|
||||
ip_range_end IPv4,
|
||||
region LowCardinality(String),
|
||||
ip_range_cidr String MATERIALIZED IPv4RangeToCIDRString(ip_range_start, ip_range_end),
|
||||
INDEX region_idx region TYPE bloom_filter
|
||||
country_code LowCardinality(String),
|
||||
country LowCardinality(String),
|
||||
INDEX country_idx country TYPE bloom_filter
|
||||
) ENGINE = ReplicatedMergeTree(
|
||||
'/clickhouse/tables/{shard}/ip_region_map',
|
||||
'{replica}'
|
||||
@@ -29,7 +30,7 @@ CREATE TABLE ip_region_map (
|
||||
ORDER BY ip_range_start;
|
||||
|
||||
CREATE DICTIONARY ip_region_dict
|
||||
(ip_range_cidr String, region String)
|
||||
(ip_range_cidr String, country_code String, country String)
|
||||
PRIMARY KEY ip_range_cidr
|
||||
SOURCE(CLICKHOUSE(TABLE 'ip_region_map'))
|
||||
LAYOUT(ip_trie)
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
INSERT INTO ip_region_map (ip_range_start, ip_range_end, region)
|
||||
FROM INFILE '/var/lib/clickhouse/user_files/csv/ip_region_map.csv'
|
||||
INSERT INTO ip_region_map (ip_range_start, ip_range_end, country_code, country)
|
||||
FROM INFILE '/var/lib/clickhouse/user_files/csv/ip_region_cc_map.csv'
|
||||
FORMAT CSVWithNames;
|
||||
|
||||
Reference in New Issue
Block a user