import time from faker import Faker from clickhouse_driver import Client from concurrent.futures import ThreadPoolExecutor from datetime import datetime # 安装Python LZ4支持 # pip install lz4 class ClickHouseBenchmark: def __init__(self, host='localhost', port=9000, user='default', password='', database='default'): # 配置原生协议连接 self.client = Client( host=host, port=port, user=user, password=password, database=database, settings={ 'max_block_size': 1000000, 'async_insert': 1, 'wait_for_async_insert': 0 }, compression='lz4' # 启用LZ4压缩 ) self.fake = Faker() self.table_name = 'perf_test' self._init_table() def _init_table(self): """创建测试表结构(MergeTree引擎)""" self.client.execute(f""" CREATE TABLE IF NOT EXISTS {self.table_name} ( id UInt64, name String, email String, created_at DateTime, value Float64, tags Map(String, String) ) ENGINE = MergeTree() ORDER BY (created_at, id) SETTINGS index_granularity = 8192 """) def generate_batch(self, batch_size=50000): """生成批量测试数据(优化内存效率)""" return [ ( i, # id self.fake.name(), # name self.fake.email(), # email datetime.now(), # created_at self.fake.pyfloat(), # value {'dept': self.fake.job(), 'loc': self.fake.city()} # tags ) for i in range(batch_size) ] def write_batch(self, data): """执行批量写入(二进制协议)""" return self.client.execute( f"INSERT INTO {self.table_name} VALUES", data, types_check=True ) def run_benchmark(self, total_records=1_000_000, batch_size=50_000, threads=4): """运行多线程压力测试""" print(f"🏁 开始测试: 总数据量={total_records:,} | 批量大小={batch_size:,} | 线程数={threads}") start_time = time.time() batches = total_records // batch_size with ThreadPoolExecutor(max_workers=threads) as executor: # 分批次提交任务 futures = [ executor.submit( self.write_batch, self.generate_batch(batch_size) ) for _ in range(batches) ] # 等待所有任务完成 for future in futures: future.result() duration = time.time() - start_time print(f""" ✅ 测试完成 总耗时: {duration:.2f}秒 写入速度: {total_records / duration:,.0f} 条/秒 """) if __name__ == "__main__": # 实例化测试工具(使用你的实际参数) benchmark = ClickHouseBenchmark( host='10.10.14.250', # ClickHouse服务器IP port=9000, # 原生协议端口 user='default', # 用户名 password='DsideaL147258369', # 密码 database='default' # 数据库名 ) # 执行性能测试(参数可调整) benchmark.run_benchmark( total_records=1_000_000, # 总数据量 batch_size=50_000, # 每批数据量 threads=4 # 并发线程数 )