球baba的欢乐时光

vuePress-theme-reco 球baba    2020 - 2025
球baba的欢乐时光 球baba的欢乐时光

Choose mode

  • dark
  • auto
  • light
Home
Category
  • 伪技术宅
Tag
TimeLine
Contact
  • reco_luan大大的NPM
  • reco_luan大大的GitHub
author-avatar

球baba

27

文章

58

标签

Home
Category
  • 伪技术宅
Tag
TimeLine
Contact
  • reco_luan大大的NPM
  • reco_luan大大的GitHub
  • 使用python增加CPU使用率

使用python增加CPU使用率

vuePress-theme-reco 球baba    2020 - 2025

使用python增加CPU使用率


球baba 2024-11-10 11:14:00 python cpu usage

要在linux的主机上让CPU达到一定的负荷有很多方案,比如用dd,1个dd就能把1个C跑满,但总觉得这样缺少变化

感觉最佳解决方案是用cpu-load-generator ,但离线主机还要手工安装就觉得有点烦

找了个不用额外安装就能用的例子Generate CPU load using Python

稍微修改一下,加个循环就可以实现长时间随机高CPU使用率了

本来是想用线程来修改全局变更来实现记录进程ID,但线程的话python进程只能跑到100%,还是用示例里新开进程才能多跑,但进程无法修改全局变量,这里使用ini来记录进程ID,用sqlite3应该也可以。

import math
import time
import multiprocessing
import random
import configparser
import os
import sys

def get_config_file_path():
    python_work_dir = os.path.dirname(__file__)  
    appconfig_path = os.path.join(python_work_dir, 'monitor.ini')  

    return appconfig_path

def check_config():

    appconfig_path = get_config_file_path()
    config = configparser.ConfigParser()
    config.read(appconfig_path,encoding='utf-8')
    if(not(os.path.exists(appconfig_path))):
        config.add_section("monitor")
        config.set("monitor", "processes", "")
        config.write(open(appconfig_path, "w",encoding='utf-8'))
        print("monitor.ini initialized success")
        # sys.exit()

    else:
        try:
            config.add_section("monitor")
        except Exception as e:
            pass
        finally:
            config.write(open(appconfig_path, "w",encoding='utf-8'))


        config.set("monitor", "processes", "")
        config.write(open(appconfig_path, "w",encoding='utf-8'))


def ini_read(section,option):
    appconfig_path = get_config_file_path()
    config = configparser.ConfigParser()
    config.read(appconfig_path,encoding='utf-8')

    return config.get(section, option)

def ini_write(section,option,value):
    appconfig_path = get_config_file_path()
    config = configparser.ConfigParser()
    config.read(appconfig_path,encoding='utf-8')

    config.set(section, option,value)
    config.write(open(appconfig_path, "w",encoding='utf-8'))

def monitor():
    global processes
    start_time = time.time()
    pid=str(os.getpid())
    monitor_processes = ini_read("monitor", "processes")
    if monitor_processes=="" :
        monitor_processes=pid
    else:
        monitor_processes=monitor_processes+"|"+pid
    ini_write("monitor", "processes",monitor_processes)
    duration=int(random.randint(60,120))
    util_left=65
    util_right=80
    # print("util_left:"+str(util_left)+" , util_right:"+str(util_right))
    util=int(random.randint(util_left,util_right))
    for i in range(0, duration):
        while time.time() - start_time < util / 100.00:
            math.pow(2, 10)
        time.sleep(1 - util / 100.00)
        start_time += 1
    monitor_processes = ini_read("monitor", "processes")
    processes = monitor_processes.split("|")
    processes.remove(pid)
    target=""
    for i in processes:
        target=target+"|"+str(i)
    if target != "":
        target=target[1:]
    # print(target)
    ini_write("monitor", "processes",target)



if __name__ == "__main__":
    check_config()

    while True:
        try:
            monitor_processes = ini_read("monitor", "processes")
            processes = monitor_processes.split("|")
            if len(processes) < multiprocessing.cpu_count() :
                p = multiprocessing.Process(target=monitor)
                p.start()
                # print(processes)
            time.sleep(1)
        except Exception as e:
            pass
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101

如果python用的是2.X版本,需要稍作调整,需要用下面这段才行

import math
import time
import multiprocessing
import random
import ConfigParser
import os
import sys

def get_config_file_path():
    python_work_dir = os.path.dirname(__file__) 
    appconfig_path = os.path.join(python_work_dir, 'monitor.ini') 

    return appconfig_path

def check_config():

    appconfig_path = get_config_file_path()
    config = ConfigParser.ConfigParser()
    config.read(appconfig_path)
    if(not(os.path.exists(appconfig_path))):
        config.add_section("monitor")
        config.set("monitor", "processes", "")
        config.write(open(appconfig_path, "w"))
        print("monitor.ini initialized success")
        # sys.exit()

    else:
        try:
            config.add_section("monitor")
        except Exception as e:
            pass
        finally:
            config.write(open(appconfig_path, "w"))


        config.set("monitor", "processes", "")
        config.write(open(appconfig_path, "w"))


def ini_read(section,option):
    appconfig_path = get_config_file_path()
    config = ConfigParser.ConfigParser()
    config.read(appconfig_path)

    return config.get(section, option)

def ini_write(section,option,value):
    appconfig_path = get_config_file_path()
    config = ConfigParser.ConfigParser()
    config.read(appconfig_path)

    config.set(section, option,value)
    config.write(open(appconfig_path, "w"))

def monitor():
    global processes
    start_time = time.time()
    pid=str(os.getpid())
    monitor_processes = ini_read("monitor", "processes")
    if monitor_processes=="" :
        monitor_processes=pid
    else:
        monitor_processes=monitor_processes+"|"+pid
    ini_write("monitor", "processes",monitor_processes)
    duration=int(random.randint(60,120))
    util_left=65
    util_right=80
    # print("util_left:"+str(util_left)+" , util_right:"+str(util_right))
    util=int(random.randint(util_left,util_right))
    for i in range(0, duration):
        while time.time() - start_time < util / 100.00:
            math.pow(2, 10)
        time.sleep(1 - util / 100.00)
        start_time += 1
    monitor_processes = ini_read("monitor", "processes")
    processes = monitor_processes.split("|")
    processes.remove(pid)
    target=""
    for i in processes:
        target=target+"|"+str(i)
    if target != "":
        target=target[1:]
    # print(target)
    ini_write("monitor", "processes",target)



if __name__ == "__main__":
    check_config()

    while True:
        try:
            monitor_processes = ini_read("monitor", "processes")
            processes = monitor_processes.split("|")
            if len(processes) <= multiprocessing.cpu_count() :
                p = multiprocessing.Process(target=monitor)
                p.start()
                # print(processes)
            time.sleep(1)
        except Exception as e:
            pass
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101

🌻🌻🌻