跳转至

宛如泥潭的大型项目开发困境

导言

当时我选择一线的原因是决定能最解决客户,每个工作能产生最大的价值。

通过一段时间的开发,我感觉在一线开发就像在泥潭里前进:走得越快越远,泥潭陷得越深,前进阻力越大。

困境为何而来,如何解决困境,是我想讨论的重点。

泥潭

  1. 开发者:
    1. 缺少全局视野,只专注于自己需求的交付;导致代码质量参差不齐、风格差异大、冗余, 代码量也极速增长,(e.g., MindSpeed-MM半年内代码变成14万)
    2. 方案: 1. 重构代码,提高重用率,易读易用性。
  2. 外部模块:
    1. 在当前项目很庞大的情况下,涉及的外部仓库也众多;在一些难以覆盖的冷门用例里,往往会出现当前项目的项目修改,通过模块A的接口,在模块B里产生报错。(e.g.,PTA中lazyinit GE的启动,会通过CANN接口,在torchair里出现报错);但是模块A/B不仅文档缺乏而且代码也不开源,导致debug困难。
    2. 临时方案: 只能通过二分代码修改,二分commit来测试, 定位到是哪个commit导致bug。
    3. 长期方案: 开发中要求 1. 每个coomit要保证功能原子化,并且能够测试。 2. 持续测试,来监控开发中每个commit修改。(在每天晚上的时间内跑更多的测试用例)
  3. 封装:
    1. 大型项目封装抽象之后,都变成了开发者在框架上开发一个可选的选项,用户侧只需要json里设置选项就能使用。问题就是在debug时,有20层的调用栈都是共通的,导致VizTracer特别大。
    2. 临时方案: VizTracer提供了选项,可以只记录关系的函数。
  4. 流水化串行路径:
    1. 项目为了加速,往往会选择并行化,串行任务流水化切分。常见方式是线程A处理完之后,把数据封装传给接口,线程B开始处理。这会导致debug时调用栈中断,加上过度封装,导致前20层是无用的调用栈信息。
    2. 方案: 流水化加速往往有环境变量开控制开启或者关闭,关闭之后串行执行debug,问题才更明晰。

Next

Step 1 : 监控修改

  • 操作:监控自己和他人的修改, 对正确性/可运行、精度、性能进行监控。
  • 目的:及早发现错误,避免错误累计导致纠错成本激增。

  • 借助windmill,搭建测试框架, 持续监控每个commit修改的正确性。

  • 二分commit测试脚本,用单次UT和每日ST看护起来。
  • 借助 Prometheus + Grafana ,实现数据库保存数据+告警。
Prometheus 基本使用

Prometheus + Grafana 并非传统意义上的关系型数据库应用,而是一个专门针对 时序数据(Time Series Data) 的监控与可视化组合。它们非常适合存储和分析您描述的 AI训练性能指标(时间、脚本、算子名称、输入参数规模、软件版本、吞吐量等)。以下是具体方案:


1. 技术栈定位 | 组件 | 角色 | 适用性说明 | |---------------|--------------------------------------------------------------|-----------------------------------| | Prometheus | 时序数据库 + 监控系统 | 存储时间序列数据,支持多维标签查询 | | Grafana | 数据可视化平台 | 创建交互式仪表盘,支持 Prometheus 数据源 | | AI训练代码 | 数据生产者 | 通过埋点推送性能指标到 Prometheus |


2. 数据模型设计 在 Prometheus 中,您的性能指标应设计为 带标签的时间序列,例如:

# 指标名称: operator_throughput
# 标签: script, operator_name, input_scale, can_version, mm_version
operator_throughput{
script="train_resnet.py",
operator_name="conv2d",
input_scale="[batch=32, in_ch=256, H=56, W=56]",
can_version="6.3.RC2",
mm_version="1.8.0"
} 1.2e9  # 值=1.2 GFLOPs/s

标签设计原则: - 高基数警告:避免标签值过多(如 input_scale 的每个具体数值都不同),建议对参数规模做分桶(如 input_scale="small/medium/large")。 - 版本控制can_versionmm_version 作为标签,方便对比不同软件版本性能。


3. 代码接入方案 (1) 在训练代码中埋点 使用 prometheus-client 库推送指标:

from prometheus_client import Gauge, push_to_gateway

# 定义指标
OP_THROUGHPUT = Gauge(
'operator_throughput',
'Throughput of AI operators (FLOPs/s)',
['script', 'operator_name', 'input_scale', 'can_version', 'mm_version']
)

def log_operator_performance(
script_name: str,
op_name: str,
input_shape: tuple,
can_ver: str,
mm_ver: str,
flops: float,
duration: float
):
throughput = flops / duration  # 计算吞吐量
OP_THROUGHPUT.labels(
      script=script_name,
      operator_name=op_name,
      input_scale=str(input_shape),
      can_version=can_ver,
      mm_version=mm_ver
).set(throughput)

# 推送到 Prometheus Pushgateway
push_to_gateway('prometheus:9091', job='ai_training', registry=REGISTRY)

(2) Prometheus 配置prometheus.yml 中添加 Pushgateway 目标:

scrape_configs:
- job_name: 'pushgateway'
honor_labels: true  # 保留用户提交的标签
static_configs:
      - targets: ['pushgateway:9091']


4. 数据存储与查询 #### 存储特性: - 时间序列数据库:数据按时间索引,自动处理时间窗口聚合。 - 保留策略:默认保留 15 天,可通过 --storage.tsdb.retention.time=365d 调整。

PromQL 查询示例

# 查询最新版本 CANN 的卷积算子平均吞吐量
avg(operator_throughput{operator_name="conv2d", can_version=~"6.3.*"}) by (input_scale)

# 对比不同软件版本的性能
operator_throughput{operator_name="matmul"} / ignoring(version) group_left operator_throughput{operator_name="matmul", can_version="6.2.0"}


5. Grafana 可视化 仪表盘设计建议: 1. 全局筛选器:脚本名称、算子类型、软件版本。 2. 核心图表: - 吞吐量随时间变化曲线 - 不同输入规模的性能热力图 - 版本对比柱状图 3. 统计面板:当前峰值、历史均值、性能变化率。

Grafana 示例


6. 报警机制 (1) Prometheus Alertmanager 配置规则检测性能劣化:

groups:
- name: ai_performance
rules:
- alert: OperatorThroughputDrop
expr: |
      operator_throughput
      / ignoring(version) group_left
      operator_throughput offset 1h
      < 0.8  # 吞吐量下降超过20%
for: 10m
labels:
      severity: critical
annotations:
      summary: "算子 {{ $labels.operator_name }} 性能下降 (当前 {{ $value }}x)"

(2) Grafana 内置报警 直接在仪表盘中设置阈值报警:

WHEN last() OF query(A, 5m, now) IS BELOW 1e9


7. 扩展方案 #### 长期存储: - Thanos/Cortex:扩展 Prometheus 的存储容量,支持无限保留时间。 - InfluxDB:替代 Prometheus,适合更高写入吞吐场景。

元数据关联: - 将脚本版本、训练超参数等存储到 PostgreSQL,通过 script_name 关联时序数据。


  • 多维分析:通过标签组合快速定位瓶颈(如特定算子+特定输入规模)。
  • 实时性:秒级监控延迟,适合训练任务实时调优。
  • 历史回溯:对比不同版本/硬件下的性能表现。

建议先以 Pushgateway 实现最小原型,再逐步扩展为完整监控体系。

Step 2 : 剖析热点

  • 操作:
    • 借助VizTracer和torch_npu.profiler, 分阶段对热点算子进行分析
      • 开启record_shapes
      • 利用api开启layer层的分析
    • 各部分(step维度、函数维度、算子维度)对比NPU和GPU,提前定位劣化算子。
    • 数据集监控基本算子的性能吞吐。(时间,脚本,算子名称,输入参数规模,CANN/MM软件版本,性能)
  • 目的:从算子维度和函数模块维度分析性能不足之处。

layer层分析

PyTorch Profiler 默认情况下主要统计 算子层(如卷积、矩阵乘法等底层计算操作) 的耗时,但通过合理配置和扩展,可以实现对 模型层(如 nn.Module 子类) 的耗时分析。以下是具体方法:


1. 基础方法:启用层级跟踪(记录不需要的层,会耗费大量的时间,导致不准。)

PyTorch Profiler 可以结合模型结构信息,通过 with_stack=True 参数记录调用栈,间接关联代码层级:

with torch.profiler.profile(
      activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
      schedule=torch.profiler.schedule(wait=1, warmup=1, active=3),
      on_trace_ready=torch.profiler.tensorboard_trace_handler('./log'),
      with_stack=True  # 记录调用栈
) as profiler:
      for _ in range(5):
            model(inputs)
            profiler.step()

在生成的 Chrome Tracing 可视化结果中,可以通过调用栈回溯到具体的 nn.Module 层级。


3. 自动化标注:通过 Hook 绑定层级

遍历模型的所有子模块,自动为每个层添加标记

def add_profiler_hooks(model):
      for name, module in model.named_modules():
            if isinstance(module, nn.Module):
                  original_forward = module.forward
                  def wrapped_forward(*args, _name=name, _original=original_forward, **kwargs):
                        with record_function(f"Layer_{_name}"):
                              return _original(*args, **kwargs)
                  module.forward = wrapped_forward
return model

model = add_profiler_hooks(model)

代码端提取层级耗时

profiler.export_chrome_trace("trace.json")  # 导出为 Chrome Tracing 格式
events = profiler.key_averages(group_by_input_shape=True)
for event in events:
      if "Layer_" in event.key:
            print(f"Layer: {event.key}, CPU Time: {event.cpu_time}, CUDA Time: {event.cuda_time}")

Layer Name CPU Time (ms) CUDA Time (ms) Input Shape
Layer_encoder.blocks.0 12.3 8.2 [batch, 256, 768]
Layer_encoder.blocks.1 11.8 7.9 [batch, 256, 768]
Layer_classifier 5.1 3.4 [batch, 768]

  1. 性能开销:记录层级会增加 Profiler 的运行时间(约 5%~10%),建议仅在调试时开启。
  2. 反向传播跟踪:需配合 profile_memory=Truewith_stack=True 捕获反向传播耗时。
  3. 动态图限制:对于动态控制流(如循环、条件分支),需手动标记关键代码段。

通过以上方法,您可以将 PyTorch Profiler 的统计粒度从 算子级 提升到 模型层(用户定义层)级,更直观地定位性能瓶颈。

Step 3 : 深入理解

  • 操作:从计算数据量的估计和分配的角度,理解代码运行逻辑。
  • 目的:
    • 理解各个模块的意义,来实现理解模型的策略(见后续)。
    • 从整体模型的角度,探查有没有重复的计算和存储。

目标功能

  1. 打印函数输入和输出变量的参数量(tensor的大小),计算内存占用(乘上fp16或者fp32)
    1. 借助torchinfo
  2. 给出每个函数的时间复杂度公式(和输入参数有关)。
    1. 支持手动标记一些函数的时间复杂度公式。 1. 自定义的装饰器,让用户用注解指定公式,然后工具在运行时收集这些信息。比如用@profile_complexity("O(n^2)")这样的装饰器,记录函数的时间复杂度表达式。
    2. 支持基于已有的公式自动推导组合函数的时间复杂度公式。 1. 可以考虑SymPy
  3. 建模基本算子/MindSpeed元操作的性能吞吐,(反向通过实际执行时间除以计算量来估计)
    1. 支持监控不同时期的性能吞吐,如果有算子劣化或者增强,有报警机制。
  4. 支持静态性能分析:
    1. 基于上述的,参数量 + 时间复杂度公式 + 元操作吞吐,实现静态估计。

实现要求

  1. 支持低侵入代码来打印
    1. 装饰器
    2. 兼容cpprinter格式
  2. 可视化:
    1. 无论是通过大还是长,来感知到内存占用多,和耗时长的阶段。

草稿

  1. 加速的本质: 1. 理解模型的策略: 1. 合并同类型,消除冗余计算,重计算(时间换空间) 2. 探求更小计算复杂度的等价含义实现 3. 分析无依赖阶段,实现通讯计算掩盖, 4. 异构计算:NPU处理大量规整数据,CPU处理多变繁琐分支情况。 2. 不理解模型的策略: 1. 热点阶段多机并行 2. 更细粒度的划分调度,消除气泡 3. 常出现的计算模式写成融合算子
  2. 静态性能分析 1. 如果预测准确,说明对,模型结构、计算量,NPU算子能力,计算系统的通讯。都有详细的了解和估计。为此你需要知道 2. 模型各阶段的计算量,内存占用,通讯量(如何测量) 3. 计算系统:NPU各算子的吞吐量,通讯带宽。

参考文献

评论