当前位置:编程学习 > python >>

分析几个测试人员常用python脚本

1、日志分析工具
2、性能监控工具
3、数据处理工具
4、文件处理工具
5、网络检测工具
6、定时任务工具

日志分析工具

PART 01
1.1

提取日志中错误信息
def extract_errors(log_file, output_file):
    with open(log_file, 'r') as f:
        lines = f.readlines()
    
    errors = [line for line in lines if "ERROR" in line]
    
    with open(output_file, 'w') as f:
        f.writelines(errors)

# 使用示例
extract_errors("app.log", "errors.log")


1.2

统计日志中关键词出现次数
from collections import defaultdict

def count_keywords(log_file, keywords):
    keyword_count = defaultdict(int)
    
    with open(log_file, 'r') as f:
        for line in f:
            for keyword in keywords:
                if keyword in line:
                    keyword_count[keyword] += 1
    
    return keyword_count

# 使用示例
keywords = ["ERROR", "WARNING", "INFO"]
result = count_keywords("app.log", keywords)
print(result)


1.3

按时间范围过滤日志
from datetime import datetime

def filter_logs_by_time(log_file, start_time, end_time, output_file):
    start = datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
    end = datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
    
    withopen(log_file, 'r') as f:
        logs = f.readlines()
    
    filtered_logs = []
    forloginlogs:
        log_time_str = log.split()[0] + " " + log.split()[1] # 假设时间戳在日志的前两部分
        log_time = datetime.strptime(log_time_str, "%Y-%m-%d %H:%M:%S")
        ifstart <= log_time <= end:
            filtered_logs.append(log)
    
    withopen(output_file, 'w') as f:
        f.writelines(filtered_logs)

# 使用示例
filter_logs_by_time("app.log", "2025-02-26 12:00:00", "2025-02-06 14:00:00", "filtered_logs.log")


1.4

统计日志中高频错误
from collections import Counter
import re

def top_n_errors(log_file, n=5):
    error_pattern = re.compile(r"ERROR: (.+)")
    errors = []
    
    with open(log_file, 'r') as f:
        for line in f:
            match = error_pattern.search(line)
            if match:
                errors.append(match.group(1))
    
    return Counter(errors).most_common(n)

# 使用示例
top_errors = top_n_errors("app.log", n=3)
print(top_errors)


1.5

日志文件合并
def merge_log_files(log_files, output_file):
    with open(output_file, 'w') as outfile:
        for log_file in log_files:
            with open(log_file, 'r') as infile:
                outfile.write(infile.read())

# 使用示例
merge_log_files(["log1.log", "log2.log", "log3.log"], "merged_logs.log")


1.6

日志文件实时监控
import time

def tail_log_file(log_file):
    with open(log_file, 'r') as f:
        f.seek(0, 2) # 移动到文件末尾
        while True:
            line = f.readline()
            if line:
                print(line.strip())
            else:
                time.sleep(0.1)

# 使用示例
tail_log_file("app.log")

性能监控工具

PART 02
2.1

监控cpu和内存使用情况
import psutil
import time

def monitor_system(interval=1):
    while True:
        cpu_usage = psutil.cpu_percent(interval=interval)
        memory_usage = psutil.virtual_memory().percent
        print(f"CPU Usage: {cpu_usage}% | Memory Usage: {memory_usage}%")
        time.sleep(interval)

# 使用示例
monitor_system(interval=2)


2.2

监控GPU使用情况
import pynvml

def monitor_gpu_usage():
    pynvml.nvmlInit()
    device_count = pynvml.nvmlDeviceGetCount()
    
    for i in range(device_count):
        handle = pynvml.nvmlDeviceGetHandleByIndex(i)
        util = pynvml.nvmlDeviceGetUtilizationRates(handle)
        memory_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
        print(f"GPU {i}: Usage={util.gpu}%, Memory Used={memory_info.used / 1024 ** 2} MB")

# 使用示例
monitor_gpu_usage()

2.3

监控网络带宽
import psutil
import time

def monitor_network_usage(interval=1):
    old_value = psutil.net_io_counters().bytes_sent + psutil.net_io_counters().bytes_recv
    
    whileTrue:
        new_value = psutil.net_io_counters().bytes_sent + psutil.net_io_counters().bytes_recv
        bandwidth = (new_value - old_value) / interval # 计算带宽(字节/秒)
        print(f"Network Bandwidth: {bandwidth} B/s")
        
        old_value = new_value
        time.sleep(interval)

# 使用示例
monitor_network_usage(interval=2)

2.4

监控磁盘IO
import psutil
import time

def monitor_disk_io(interval=1):
    old_read = psutil.disk_io_counters().read_bytes
    old_write = psutil.disk_io_counters().write_bytes
    
    whileTrue:
        new_read = psutil.disk_io_counters().read_bytes
        new_write = psutil.disk_io_counters().write_bytes
        
        read_speed = (new_read - old_read) / interval
        write_speed = (new_write - old_write) / interval
        
        print(f"Read Speed: {read_speed / 1024} KB/s | Write Speed: {write_speed / 1024} KB/s")
        
        old_read = new_read
        old_write = new_write
        time.sleep(interval)

# 使用示例
monitor_disk_io(interval=2)

2.5

监控进程资源占用
import psutil

def monitor_process(pid):
    process = psutil.Process(pid)
    
    while True:
        cpu_usage = process.cpu_percent(interval=1)
        memory_usage = process.memory_info().rss / 1024 ** 2  # 转换为MB
        print(f"PID {pid}: CPU={cpu_usage}%, Memory={memory_usage} MB")

# 使用示例
monitor_process(1234) # 替换为目标进程的PID

2.6

监控系统温度
import psutil

def monitor_temperature():
    temps = psutil.sensors_temperatures()
    for name, entries in temps.items():
        for entry in entries:
            print(f"{name}: {entry.label} = {entry.current}°C")

# 使用示例
monitor_temperature()


数据处理工具

PART 03
3.1

数据清洗
import pandas as pd

def clean_data(input_file, output_file):
    df = pd.read_csv(input_file)
    df.dropna(inplace=True) # 删除空值
    df.drop_duplicates(inplace=True) # 删除重复值
    df.to_csv(output_file, index=False)

# 使用示例
clean_data("data.csv", "cleaned_data.csv")


3.2

数据对比
import pandas as pd

def compare_data(file1, file2):
    df1 = pd.read_csv(file1)
    df2 = pd.read_csv(file2)
    diff = df1.compare(df2)
    return diff

# 使用示例
result = compare_data("file1.csv", "file2.csv")
print(result)


文件处理工具

PART 04
4.1

批量重命名文件
import os

def batch_rename(directory, prefix):
    for count, filename in enumerate(os.listdir(directory)):
        new_name = f"{prefix}_{count}.txt"
        os.rename(os.path.join(directory, filename), os.path.join(directory, new_name))

# 使用示例
batch_rename("/path/to/files", "file")


4.2

查找大文件
import os

def find_large_files(directory, size_limit_mb):
    size_limit = size_limit_mb * 1024 * 1024# 转换为字节
    large_files = []
    
    for root, dirs, files in os.walk(directory):
        for file in files:
            file_path = os.path.join(root, file)
            if os.path.getsize(file_path) > size_limit:
                large_files.append(file_path)
    
    return large_files

# 使用示例
large_files = find_large_files("/path/to/directory", 100) # 查找大于100MB的文件
print(large_files)


网络检测工具

PART 05
5.1

检测端口是否开放
import socket

def check_port(host, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    result = sock.connect_ex((host, port))
    sock.close()
    return result == 0

# 使用示例
if check_port("example.com", 80):
    print("Port 80 is open")
else:
    print("Port 80 is closed")

5.2

批量ping测试
import os

def ping_hosts(hosts):
    for host in hosts:
        response = os.system(f"ping -c 1 {host}")
        if response == 0:
            print(f"{host} is up")
        else:
            print(f"{host} is down")

# 使用示例
hosts = ["google.com", "example.com", "localhost"]
ping_hosts(hosts)

定时任务工具

PART 06
6.1

定时执行任务
import time
from datetime import datetime

def scheduled_task(interval):
    while True:
        print(f"Task executed at {datetime.now()}")
        time.sleep(interval)

# 使用示例
scheduled_task(60) # 每60秒执行一次
CopyRight © 2022 站长资源库 编程知识问答 zzzyk.com All Rights Reserved
部分文章来自网络,