From bd8cc971ba9169818a31f761cd178a12e888f097 Mon Sep 17 00:00:00 2001 From: wjchuee Date: Mon, 1 Sep 2025 11:29:10 +0800 Subject: [PATCH] msmonitor db develop part 3 --- msmonitor/docs/npumonitor.md | 7 +- .../IPCMonitor/dynamic_monitor_proxy.py | 4 +- msmonitor/plugin/README.md | 4 +- msmonitor/plugin/bindings.cpp | 5 +- .../plugin/ipc_monitor/DynoLogNpuMonitor.cpp | 32 +- .../plugin/ipc_monitor/DynoLogNpuMonitor.h | 2 - msmonitor/plugin/ipc_monitor/TimerTask.h | 8 +- .../plugin/ipc_monitor/db/Connection.cpp | 8 +- msmonitor/plugin/ipc_monitor/db/DBConstant.h | 1 + msmonitor/plugin/ipc_monitor/db/DBInfo.h | 42 ++ .../ipc_monitor/db/DBProcessManager.cpp | 427 ++++++++++++++++++ .../plugin/ipc_monitor/db/DBProcessManager.h | 159 +++++++ msmonitor/plugin/ipc_monitor/db/DataBase.cpp | 10 +- msmonitor/plugin/ipc_monitor/db/DataBase.h | 2 +- .../ipc_monitor/metric/MetricManager.cpp | 6 +- .../plugin/ipc_monitor/metric/MetricManager.h | 28 +- .../mspti_monitor/MsptiDataProcessBase.h | 43 ++ .../mspti_monitor/MsptiMonitor.cpp | 82 +++- .../ipc_monitor/mspti_monitor/MsptiMonitor.h | 18 +- msmonitor/plugin/ipc_monitor/utils.cpp | 5 +- ...347\275\221URL\350\257\264\346\230\216.md" | 5 +- 21 files changed, 826 insertions(+), 72 deletions(-) create mode 100644 msmonitor/plugin/ipc_monitor/db/DBInfo.h create mode 100644 msmonitor/plugin/ipc_monitor/db/DBProcessManager.cpp create mode 100644 msmonitor/plugin/ipc_monitor/db/DBProcessManager.h create mode 100644 msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiDataProcessBase.h diff --git a/msmonitor/docs/npumonitor.md b/msmonitor/docs/npumonitor.md index 8dc112b68..2b1c94fff 100644 --- a/msmonitor/docs/npumonitor.md +++ b/msmonitor/docs/npumonitor.md @@ -24,6 +24,7 @@ npu-monitor的SUBCOMMANDS(子命令)选项如下: | --npu-monitor-stop | action | 停止性能监控,设置参数后生效,默认不生效 | Y | Y | N | | --report-interval-s | int | 性能监控数据上报周期,单位s,需要在启动时设置。默认值60 | Y | Y | N | | --mspti-activity-kind | String | 性能监控数据上报数据类型,可以设置单个或多个,多个类型以逗号分隔,每次设置时刷新全局上报类型。可选值范围[`Marker`, `Kernel`, `API`, `Hccl`, `Memory`, `MemSet`, `MemCpy`, `Communication`] , 默认值`Marker` | Y | Y | N | +| --log-file | String | 性能数据采集落盘的路径,当前仅支持`mspti-activity-kind`设置为`Marker`,`Kernel`,`API`,`Communication`4类数据的导出, 落盘数据格式为db,db中内容说明请参考[msprof导出db格式数据说明](https://www.hiascend.com/document/detail/zh/canncommercial/82RC1/devaids/Profiling/atlasprofiling_16_1144.html),默认值为空,表示不落盘 | Y | Y | N | ## npu-monitor使用方法 @@ -79,7 +80,11 @@ dyno --certs-dir /home/client_certs npu-monitor --report-interval-s 30 --mspti-a # 上报周期30s, 上报数据类型Marker和Kernel dyno --certs-dir /home/client_certs npu-monitor --npu-monitor-start --report-interval-s 30 --mspti-activity-kind Marker,Kernel -# 示例5:多机场景下性能监控开启时修改配置 +# 示例5:性能监控开启时修改配置,开启数据采集落盘 +# 数据落盘路径为/tmp/msmonitor_db,落盘周期为30s,采集数据类型为Marker,Kernel,Communication +dyno --certs-dir /home/client_certs npu-monitor --npu-monitor-start --report-interval-s 30 --mspti-activity-kind Marker,Kernel,Communication --log-file /tmp/msmonitor_db + +# 示例6:多机场景下性能监控开启时修改配置 # 多机场景下向特定机器x.x.x.x发送参数信息,参数表示上报周期30s, 上报数据类型Marker和Kernel dyno --certs-dir /home/client_certs --hostname x.x.x.x npu-monitor --npu-monitor-start --report-interval-s 30 --mspti-activity-kind Marker,Kernel ``` diff --git a/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py b/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py index 80be4a427..2b3893554 100644 --- a/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py +++ b/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py @@ -17,7 +17,7 @@ import sys import os import importlib from .singleton import Singleton - +from .utils import get_parallel_group_info so_path = os.path.join(os.path.dirname(__file__), "lib64") sys.path.append(os.path.realpath(so_path)) @@ -37,6 +37,8 @@ class PyDynamicMonitorProxy: @classmethod def enable_dyno_npu_monitor(cls, config_map: dict): + if str(config_map.get("NPU_MONITOR_STOP")).lower() in ("true", "1"): + ipcMonitor_C_module.set_cluster_config_data({"parallel_group_info": get_parallel_group_info()}) ipcMonitor_C_module.enable_dyno_npu_monitor(config_map) @classmethod diff --git a/msmonitor/plugin/README.md b/msmonitor/plugin/README.md index b6e3a4ca2..b82d2a750 100644 --- a/msmonitor/plugin/README.md +++ b/msmonitor/plugin/README.md @@ -7,8 +7,8 @@ __PyDynamicMonitorProxy接口说明__: * `init_dyno` 向dynolog daemon发送注册请求 - * input: npuId(int) - * return: None + * input: npu_id(int) + * return: None * `poll_dyno` 向dynolog daemon获取Profiler控制参数 * input: None * return: str,返回控制参数 diff --git a/msmonitor/plugin/bindings.cpp b/msmonitor/plugin/bindings.cpp index 545c6c5f6..b67805685 100644 --- a/msmonitor/plugin/bindings.cpp +++ b/msmonitor/plugin/bindings.cpp @@ -16,7 +16,7 @@ #include #include #include "ipc_monitor/PyDynamicMonitorProxy.h" -#include "ipc_monitor/utils.h" +#include "ipc_monitor/mspti_monitor/MsptiMonitor.h" namespace py = pybind11; @@ -34,4 +34,7 @@ PYBIND11_MODULE(IPCMonitor_C, m) { m.def("finalize_dyno", []() -> void { dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->FinalizeDyno(); }); + m.def("set_cluster_config_data", [](const std::unordered_map& cluster_config) -> void { + dynolog_npu::ipc_monitor::MsptiMonitor::GetInstance()->SetClusterConfigData(cluster_config); + }, py::arg("cluster_config")); } diff --git a/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.cpp b/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.cpp index 12da158ad..8e0fe68f8 100644 --- a/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.cpp +++ b/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.cpp @@ -18,6 +18,7 @@ #include #include #include "utils.h" +#include "MsptiMonitor.h" namespace dynolog_npu { namespace ipc_monitor { @@ -30,6 +31,7 @@ DynoLogNpuMonitor::DynoLogNpuMonitor() fprintf(stderr, "[INFO] [%d] Msmonitor log will record to %s\n", GetProcessId(), logPath.c_str()); logPath = logPath + "/msmonitor_"; google::InitGoogleLogging("MsMonitor"); + google::SetStderrLogging(google::GLOG_ERROR); google::SetLogDestination(google::GLOG_INFO, logPath.c_str()); google::SetLogFilenameExtension(".log"); } else { @@ -59,29 +61,31 @@ bool DynoLogNpuMonitor::Init() ErrCode DynoLogNpuMonitor::DealMonitorReq(MsptiMonitorCfg& cmd) { + auto msptiMonitor = MsptiMonitor::GetInstance(); if (cmd.monitorStop) { - if (msptiMonitor_.IsStarted()) { + if (msptiMonitor->IsStarted()) { LOG(INFO) << "Stop mspti monitor thread successfully"; - msptiMonitor_.Stop(); + msptiMonitor->Stop(); } return ErrCode::SUC; } - if (cmd.reportIntervals <= 0) { - cmd.reportIntervals = DEFAULT_FLUSH_INTERVAL; - LOG(WARNING) << "Invalid report interval, set to 60"; - } if (cmd.reportIntervals != 0) { - msptiMonitor_.SetFlushInterval(cmd.reportIntervals); + msptiMonitor->SetFlushInterval(cmd.reportIntervals); } - if (cmd.monitorStart && !msptiMonitor_.IsStarted()) { + if (cmd.monitorStart && !msptiMonitor->IsStarted()) { + if (!cmd.savePath.empty() && !msptiMonitor->CheckAndSetSavePath(cmd.savePath)) { + LOG(ERROR) << "Invalid log path, mspti monitor start failed"; + return ErrCode::PERMISSION; + } + LOG(INFO) << "Start mspti monitor thread successfully"; - msptiMonitor_.Start(); + msptiMonitor->Start(); } - if (msptiMonitor_.IsStarted() && !cmd.enableActivities.empty()) { - auto curActivities = msptiMonitor_.GetEnabledActivities(); + if (msptiMonitor->IsStarted() && !cmd.enableActivities.empty()) { + auto curActivities = msptiMonitor->GetEnabledActivities(); std::vector enableKinds; std::vector disableKinds; std::set_difference(cmd.enableActivities.begin(), cmd.enableActivities.end(), curActivities.begin(), curActivities.end(), @@ -89,10 +93,10 @@ ErrCode DynoLogNpuMonitor::DealMonitorReq(MsptiMonitorCfg& cmd) std::set_difference(curActivities.begin(), curActivities.end(), cmd.enableActivities.begin(), cmd.enableActivities.end(), std::back_inserter(disableKinds)); for (auto activity : enableKinds) { - msptiMonitor_.EnableActivity(activity); + msptiMonitor->EnableActivity(activity); } for (auto activity : disableKinds) { - msptiMonitor_.DisableActivity(activity); + msptiMonitor->DisableActivity(activity); } } return ErrCode::SUC; @@ -125,7 +129,7 @@ void DynoLogNpuMonitor::EnableMsptiMonitor(std::unordered_mapUninit(); } } // namespace ipc_monitor } // namespace dynolog_npu diff --git a/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.h b/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.h index 5ffec3bd9..50ea01723 100644 --- a/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.h +++ b/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.h @@ -18,7 +18,6 @@ #include "MonitorBase.h" #include "NpuIpcClient.h" -#include "MsptiMonitor.h" #include "singleton.h" #include "InputParser.h" @@ -49,7 +48,6 @@ private: bool isInitialized_ = false; int32_t npuId_ = 0; IpcClient ipcClient_; - MsptiMonitor msptiMonitor_; }; } // namespace ipc_monitor diff --git a/msmonitor/plugin/ipc_monitor/TimerTask.h b/msmonitor/plugin/ipc_monitor/TimerTask.h index f47e4901d..beacab875 100644 --- a/msmonitor/plugin/ipc_monitor/TimerTask.h +++ b/msmonitor/plugin/ipc_monitor/TimerTask.h @@ -74,8 +74,8 @@ public: interval.store(intervalTimes); } - virtual void InitResource() {}; - virtual void ReleaseResource() {}; + virtual void RunPreTask() {}; + virtual void RunPostTask() {}; virtual void ExecuteTask() = 0; bool IsRunning() { return running.load(); } private: @@ -83,7 +83,7 @@ private: void TaskRun() { LOG(INFO) << name << " Timer task started."; - InitResource(); + RunPreTask(); while (running) { std::unique_lock lock(cv_mutex); if (interval.load()) { @@ -101,7 +101,7 @@ private: ExecuteTask(); } } - ReleaseResource(); + RunPostTask(); LOG(INFO) << name << " Timer task stopped."; } diff --git a/msmonitor/plugin/ipc_monitor/db/Connection.cpp b/msmonitor/plugin/ipc_monitor/db/Connection.cpp index a142c6ee1..6934c14f9 100644 --- a/msmonitor/plugin/ipc_monitor/db/Connection.cpp +++ b/msmonitor/plugin/ipc_monitor/db/Connection.cpp @@ -76,8 +76,12 @@ bool Connection::ExecuteSql(const std::string &sql, const std::string &sqlType) bool Connection::CheckTableExists(const std::string &tableName) { - std::string sql = "SELECT COUNT(1) FROM " + tableName; - return ExecuteSql(sql, CHECK); + std::string sql{"SELECT 1 FROM sqlite_master WHERE type='table' AND name='" + tableName + "' LIMIT 1"}; + std::vector> result; + if(ExecuteQuery(sql, result)) { + return !result.empty(); + } + return false; } bool Connection::ExecuteCreateTable(const std::string &sql) diff --git a/msmonitor/plugin/ipc_monitor/db/DBConstant.h b/msmonitor/plugin/ipc_monitor/db/DBConstant.h index 41220390e..1fb35b318 100644 --- a/msmonitor/plugin/ipc_monitor/db/DBConstant.h +++ b/msmonitor/plugin/ipc_monitor/db/DBConstant.h @@ -28,6 +28,7 @@ const std::string SQL_REAL_TYPE = "REAL"; const std::string SQL_NUMERIC_TYPE = "NUMERIC"; const std::string TABLE_STRING_IDS = "STRING_IDS"; +const std::string TABLE_SESSION_TIME_INFO = "SESSION_TIME_INFO"; const std::string TABLE_CANN_API = "CANN_API"; const std::string TABLE_TASK = "TASK"; const std::string TABLE_COMPUTE_TASK_INFO = "COMPUTE_TASK_INFO"; diff --git a/msmonitor/plugin/ipc_monitor/db/DBInfo.h b/msmonitor/plugin/ipc_monitor/db/DBInfo.h new file mode 100644 index 000000000..b729c105f --- /dev/null +++ b/msmonitor/plugin/ipc_monitor/db/DBInfo.h @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2025-2025. Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef IPC_MONITOR_DB_INFO_H +#define IPC_MONITOR_DB_INFO_H + +#include "db/DBRunner.h" +#include "db/DataBase.h" + +namespace dynolog_npu { +namespace ipc_monitor { +namespace db { +struct DBInfo { + DBInfo() = default; + + bool ConstructDBRunner(const std::string& dbPath) + { + dbRunner = std::make_shared(dbPath); + return dbRunner != nullptr; + } + + std::shared_ptr database{nullptr}; + std::shared_ptr dbRunner{nullptr}; +}; +} // namespace db +} // namespace ipc_monitor +} // namespace dynolog_npu + +#endif // IPC_MONITOR_DB_INFO_H diff --git a/msmonitor/plugin/ipc_monitor/db/DBProcessManager.cpp b/msmonitor/plugin/ipc_monitor/db/DBProcessManager.cpp new file mode 100644 index 000000000..7e51a17d3 --- /dev/null +++ b/msmonitor/plugin/ipc_monitor/db/DBProcessManager.cpp @@ -0,0 +1,427 @@ +/* + * Copyright (C) 2025-2025. Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "db/DBProcessManager.h" +#include "db/DBConstant.h" +#include "singleton.h" + +namespace dynolog_npu { +namespace ipc_monitor { +namespace db { +namespace { +constexpr uint64_t MSTX_CONNECTION_ID_OFFSET = 4000000000ULL; +const std::string MSTX_TASK_TYPE = "MsTx"; +const std::string NA = "N/A"; +const std::vector> HCCL_DATA_TYPE = { + {msptiCommunicationDataType::MSPTI_ACTIVITY_COMMUNICATION_INT8, "INT8"}, + {msptiCommunicationDataType::MSPTI_ACTIVITY_COMMUNICATION_INT16, "INT16"}, + {msptiCommunicationDataType::MSPTI_ACTIVITY_COMMUNICATION_INT32, "INT32"}, + {msptiCommunicationDataType::MSPTI_ACTIVITY_COMMUNICATION_INT64, "INT64"}, + {msptiCommunicationDataType::MSPTI_ACTIVITY_COMMUNICATION_UINT8, "UINT8"}, + {msptiCommunicationDataType::MSPTI_ACTIVITY_COMMUNICATION_UINT16, "UINT16"}, + {msptiCommunicationDataType::MSPTI_ACTIVITY_COMMUNICATION_UINT32, "UINT32"}, + {msptiCommunicationDataType::MSPTI_ACTIVITY_COMMUNICATION_UINT64, "UINT64"}, + {msptiCommunicationDataType::MSPTI_ACTIVITY_COMMUNICATION_FP16, "FP16"}, + {msptiCommunicationDataType::MSPTI_ACTIVITY_COMMUNICATION_FP32, "FP32"}, + {msptiCommunicationDataType::MSPTI_ACTIVITY_COMMUNICATION_FP64, "FP64"}, + {msptiCommunicationDataType::MSPTI_ACTIVITY_COMMUNICATION_BFP16, "BFP16"}, + {msptiCommunicationDataType::MSPTI_ACTIVITY_COMMUNICATION_INT128, "INT128"}, + {msptiCommunicationDataType::MSPTI_ACTIVITY_COMMUNICATION_INVALID_TYPE, "INVALID_TYPE"} +}; + +constexpr uint16_t MSTX_MARKER_TYPE = 0; +constexpr uint16_t MSTX_RANGE_TYPE = 2; +const std::vector> MSTX_EVENT_TYPE = { + {0, "marker"}, + {1, "push/pop"}, + {2, "start/end"}, + {3, "marker_ex"} +}; + +const std::vector> META_DATA = { + {"SCHEMA_VERSION_MICRO", "1"}, + {"SCHEMA_VERSION_MINOR", "1"}, + {"SCHEMA_VERSION_MAJOR", "1"}, + {"SCHEMA_VERSION", "1.1.1"} +}; + +constexpr uint16_t API_NODE_TYPE = 10000; +const std::vector> API_TYPE = { + {5000, "runtime"}, + {5500, "hccl"}, + {10000, "node"}, + {15000, "model"}, + {20000, "acl"}, + {50001, "op"}, + {50002, "queue"}, + {50003, "trace"}, + {50004, "mstx"} +}; + +uint64_t ConcatGlobalTid(uint32_t pid, uint32_t tid) +{ + constexpr uint32_t INT32_BIT_COUNT = 32; + return (static_cast(pid) << INT32_BIT_COUNT) | tid; +} + +std::string GetMsmonitorDbPath(const std::string &outputPath) +{ + auto identity = join({std::to_string(GetProcessId()), getCurrentTimestamp(), std::to_string(GetRankId())}, "_"); + return outputPath + "/msmonitor_" + identity + ".db"; +} +} // namecpace + +class IdPool : public Singleton { +public: + IdPool() = default; + ~IdPool() = default; + uint64_t GetUint64Id(const std::string &key); + StringIdFormat GetStringIdData(); + +private: + uint64_t uint64Index_{0}; + std::mutex uint64IdMapMutex_; + std::unordered_map uint64IdMap_; +}; + +uint64_t IdPool::GetUint64Id(const std::string &key) +{ + std::lock_guard lock(uint64IdMapMutex_); + auto it = uint64IdMap_.find(key); + if (it != uint64IdMap_.end()) { + return it->second; + } + uint64IdMap_.emplace(key, uint64Index_); + return uint64Index_++; +} + +StringIdFormat IdPool::GetStringIdData() +{ + std::lock_guard lock(uint64IdMapMutex_); + StringIdFormat stringIdData; + stringIdData.reserve(uint64IdMap_.size()); + for (auto it : uint64IdMap_) { + stringIdData.emplace_back(it.second, it.first); + } + return stringIdData; +} + +void DBProcessManager::SetReportInterval(uint32_t interval) +{ + if (reportInterval_.load() != interval) { + LOG(INFO) << "DBProcessManager SetReportInterval interval: " << interval; + if (IsRunning()) { + SaveData(); + } + SetInterval(interval); + reportInterval_.store(interval); + } +} + +void DBProcessManager::RunPreTask() +{ + sessionStartTime_ = getCurrentTimestamp64(); +} + +void DBProcessManager::ExecuteTask() +{ + if (!SaveData()) { + LOG(ERROR) << "DBProcessManager SaveData failed"; + } +} + +bool DBProcessManager::CheckAndInitDB() +{ + std::lock_guard lock(dbMutex_); + if(msMonitorDB_.database == nullptr || msMonitorDB_.dbRunner == nullptr) { + std::shared_ptr msMonitorDB{nullptr}; + MakeSharedPtr(msMonitorDB); + msMonitorDB_.database = msMonitorDB; + auto dbPath = GetMsmonitorDbPath(savePath_); + LOG(INFO) << "msMonitor db will be save to " << dbPath; + return msMonitorDB_.database != nullptr && msMonitorDB_.ConstructDBRunner(dbPath); + } + return true; +} + +bool DBProcessManager::SaveData() +{ + if(!CheckAndInitDB()) { + LOG(ERROR) << "DBProcessManager init msmonitor db failed"; + return false; + } + + bool flag = true; + APIFormat apiData; + CommunicationOpFormat communicationOpData; + TaskFormat taskData; + ComputeTaskInfoFormat computeTaskInfoData; + MstxFormat mstxData; + + { + std::lock_guard lock(dataMutex_); + apiData = std::move(apiData_); + communicationOpData = std::move(communicationOpData_); + taskData = std::move(taskData_); + computeTaskInfoData = std::move(computeTaskInfoData_); + mstxData = std::move(mstxData_); + } + + flag = (apiData.empty() || SaveIncDataToDB(apiData, TABLE_CANN_API)) && flag; + flag = (communicationOpData.empty() || SaveIncDataToDB(communicationOpData, TABLE_COMMUNICATION_OP)) && flag; + flag = (taskData.empty() || SaveIncDataToDB(taskData, TABLE_TASK)) && flag; + flag = (computeTaskInfoData.empty() || SaveIncDataToDB(computeTaskInfoData, TABLE_COMPUTE_TASK_INFO)) && flag; + flag = (mstxData.empty() || SaveIncDataToDB(mstxData, TABLE_MSTX)) && flag; + + return flag; +} + +bool DBProcessManager::SaveConstantData() +{ + bool flag = true; + flag = InsertDataToDB(HCCL_DATA_TYPE, TABLE_HCCL_DATA_TYPE, msMonitorDB_) && flag; + flag = InsertDataToDB(MSTX_EVENT_TYPE, TABLE_MSTX_EVENT_TYPE, msMonitorDB_) && flag; + flag = InsertDataToDB(API_TYPE, TABLE_API_TYPE, msMonitorDB_) && flag; + flag = InsertDataToDB(META_DATA, TABLE_META_DATA, msMonitorDB_) && flag; + + std::vector> hostInfoData {{GetHostUid(), GetHostName()}}; + flag = InsertDataToDB(hostInfoData, TABLE_HOST_INFO, msMonitorDB_) && flag; + + std::vector> sessionTimeInfoData {{sessionStartTime_, getCurrentTimestamp64()}}; + flag = InsertDataToDB(sessionTimeInfoData, TABLE_SESSION_TIME_INFO, msMonitorDB_) && flag; + + auto stringIdData = IdPool::GetInstance()->GetStringIdData(); + flag = (stringIdData.empty() || InsertDataToDB(stringIdData, TABLE_STRING_IDS, msMonitorDB_)) && flag; + return flag; +} + +bool DBProcessManager::SaveParallelGroupData() +{ + const std::string parallel_group_info_key = "parallel_group_info"; + auto iter = clusterConfigData.find(parallel_group_info_key); + if (iter == clusterConfigData.end()) { + LOG(WARNING) << "DBProcessManager SaveParallelGroupData parallel_group_info is not found"; + return true; + } + const std::string& parallel_group_info = iter->second; + if (!parallel_group_info.empty()) { + std::vector> data {{parallel_group_info_key, parallel_group_info}}; + return InsertDataToDB(data, TABLE_META_DATA, msMonitorDB_); + } + return true; +} + +bool DBProcessManager::SaveRankDeviceData() +{ + if (msMonitorDB_.dbRunner->CheckTableExists(TABLE_RANK_DEVICE_MAP)) { + return true; + } + if (deviceSet_.empty()) { + return false; + } + auto rankId = GetRankId(); + std::vector> rankDeviceData; + rankDeviceData.reserve(deviceSet_.size()); + for (auto deviceId : deviceSet_) { + rankDeviceData.emplace_back(rankId, deviceId); + } + if (!InsertDataToDB(rankDeviceData, TABLE_RANK_DEVICE_MAP, msMonitorDB_)) { + LOG(ERROR) << "DBProcessManager insert rank device map data failed"; + return false; + } + return true; +} + +void DBProcessManager::RunPostTask() +{ + SaveData(); + + std::lock_guard lock(dataMutex_); + if (hasSavedData_) { + if (CheckAndInitDB()) { + SaveConstantData(); + SaveParallelGroupData(); + SaveRankDeviceData(); + } else { + LOG(ERROR) << "DBProcessManager init msmonitor db failed"; + } + } + sessionStartTime_ = 0; + hasSavedData_ = false; + reportInterval_.store(DEFAULT_FLUSH_INTERVAL); + deviceSet_.clear(); + apiData_.clear(); + computeTaskInfoData_.clear(); + communicationOpData_.clear(); + taskData_.clear(); + mstxData_.clear(); + mstxRangeHostDataMap_.clear(); + mstxRangeDeviceDataMap_.clear(); + savePath_.clear(); + msMonitorDB_.database = nullptr; + msMonitorDB_.dbRunner = nullptr; +} + +void DBProcessManager::ProcessApiData(msptiActivityApi *record) +{ + std::lock_guard lock(dataMutex_); + uint64_t name = IdPool::GetInstance()->GetUint64Id(record->name); + uint64_t globalTid = ConcatGlobalTid(record->pt.processId, record->pt.threadId); + uint64_t connectionId = record->correlationId; + apiData_.emplace_back(static_cast(record->start), static_cast(record->end), + API_NODE_TYPE, globalTid, connectionId, name); +} + +std::string DBProcessManager::ConstructCommOpName(const std::string &opName, const std::string &groupName) +{ + uint64_t opCount = communicationGroupOpCount_[groupName]++; + std::string groupId; + auto it = communicationGroupNameMap_.find(groupName); + if (it == communicationGroupNameMap_.end()) { + static const size_t GROUP_ID_LEN = 3; + auto groupHashId = std::to_string(CalcHashId(groupName)); + if (groupHashId.size() >= GROUP_ID_LEN) { + groupHashId = groupHashId.substr(groupHashId.size()-GROUP_ID_LEN); + } + communicationGroupNameMap_.emplace(groupName, groupHashId); + groupId = groupHashId; + } else { + groupId = it->second; + } + return opName + "_" + groupId + "_" + std::to_string(opCount) + "_1"; +} + +void DBProcessManager::ProcessCommunicationData(msptiActivityCommunication *record) +{ + std::lock_guard lock(dataMutex_); + uint64_t groupName = IdPool::GetInstance()->GetUint64Id(record->commName); + auto commOpName = ConstructCommOpName(record->name, record->commName); + uint64_t opName = IdPool::GetInstance()->GetUint64Id(commOpName); + uint32_t opId = communicationOpId_.fetch_add(1); + uint64_t algType = IdPool::GetInstance()->GetUint64Id(record->algType); + uint64_t opType = IdPool::GetInstance()->GetUint64Id(record->name); + uint64_t connectionId = record->correlationId; + communicationOpData_.emplace_back(opName, static_cast(record->start), static_cast(record->end), + connectionId, groupName, opId, 0, 0, static_cast(record->dataType), + algType, static_cast(record->count), opType); +} + +void DBProcessManager::ProcessKernelData(msptiActivityKernel *record) +{ + std::lock_guard lock(dataMutex_); + uint64_t opName = IdPool::GetInstance()->GetUint64Id(record->name); + uint64_t taskType = IdPool::GetInstance()->GetUint64Id(record->type); + uint64_t globalTaskId = globalTaskId_.fetch_add(1); + uint64_t NAId = IdPool::GetInstance()->GetUint64Id(NA); + computeTaskInfoData_.emplace_back(opName, globalTaskId, UINT32_MAX, UINT32_MAX, taskType, + NAId, NAId, NAId, NAId, NAId, NAId, NAId, NAId, NAId, NAId); + uint64_t connectionId = record->correlationId; + uint32_t deviceId = record->ds.deviceId; + taskData_.emplace_back(static_cast(record->start), static_cast(record->end), + deviceId, connectionId, globalTaskId, GetProcessId(), taskType, UINT32_MAX, + static_cast(record->ds.streamId), UINT32_MAX, UINT32_MAX); + deviceSet_.insert(deviceId); +} + +void DBProcessManager::ProcessMstxData(msptiActivityMarker *record) +{ + std::lock_guard lock(dataMutex_); + if (record->sourceKind == msptiActivitySourceKind::MSPTI_ACTIVITY_SOURCE_KIND_HOST) { + ProcessMstxHostData(record); + } else if (record->sourceKind == msptiActivitySourceKind::MSPTI_ACTIVITY_SOURCE_KIND_DEVICE) { + ProcessMstxDeviceData(record); + } +} + +void DBProcessManager::ProcessMstxHostData(msptiActivityMarker *record) +{ + uint64_t connectionId = record->id + MSTX_CONNECTION_ID_OFFSET; + uint64_t timestamp = static_cast(record->timestamp); + uint64_t message = IdPool::GetInstance()->GetUint64Id(record->name); + uint64_t domain = IdPool::GetInstance()->GetUint64Id(record->domain); + uint64_t globalTid = ConcatGlobalTid(record->objectId.pt.processId, record->objectId.pt.threadId); + if (record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_INSTANTANEOUS || + record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_INSTANTANEOUS_WITH_DEVICE) { + mstxData_.emplace_back(timestamp, timestamp, MSTX_MARKER_TYPE, UINT32_MAX, UINT32_MAX, + message, globalTid, globalTid, domain, connectionId); + } else if (record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_START || + record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_START_WITH_DEVICE) { + mstxRangeHostDataMap_.emplace(connectionId, MstxHostData{connectionId, timestamp, globalTid, domain, message}); + } else if (record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_END || + record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_END_WITH_DEVICE) { + auto it = mstxRangeHostDataMap_.find(connectionId); + if (it != mstxRangeHostDataMap_.end()) { + mstxData_.emplace_back(it->second.timestamp, timestamp, MSTX_RANGE_TYPE, UINT32_MAX, UINT32_MAX, + it->second.message, it->second.globalTid, globalTid, it->second.domain, connectionId); + mstxRangeHostDataMap_.erase(it); + } + } +} + +void DBProcessManager::ProcessMstxDeviceData(msptiActivityMarker *record) +{ + uint64_t connectionId = record->id + MSTX_CONNECTION_ID_OFFSET; + uint64_t timestamp = static_cast(record->timestamp); + uint64_t taskType = IdPool::GetInstance()->GetUint64Id(MSTX_TASK_TYPE); + if (record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_INSTANTANEOUS_WITH_DEVICE) { + taskData_.emplace_back(timestamp, timestamp, + static_cast(record->objectId.ds.deviceId), connectionId, + globalTaskId_.fetch_add(1), GetProcessId(), taskType, UINT32_MAX, + static_cast(record->objectId.ds.streamId), UINT32_MAX, UINT32_MAX); + } else if (record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_START_WITH_DEVICE) { + mstxRangeDeviceDataMap_.emplace(connectionId, + MstxDeviceData{connectionId, timestamp, globalTaskId_.fetch_add(1)}); + } else if (record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_END_WITH_DEVICE) { + auto it = mstxRangeDeviceDataMap_.find(connectionId); + if (it != mstxRangeDeviceDataMap_.end()) { + uint32_t deviceId = static_cast(record->objectId.ds.deviceId); + taskData_.emplace_back(it->second.timestamp, timestamp, + deviceId, connectionId, it->second.globalTaskId, GetProcessId(), taskType, + UINT32_MAX, static_cast(record->objectId.ds.streamId), UINT32_MAX, UINT32_MAX); + mstxRangeDeviceDataMap_.erase(it); + deviceSet_.insert(deviceId); + } + } +} + +ErrCode DBProcessManager::ConsumeMsptiData(msptiActivity *record) +{ + if (record == nullptr) { + LOG(ERROR) << "DBProcessManager::ConsumeMsptiData record is null"; + return ErrCode::VALUE; + } + switch (record->kind) { + case msptiActivityKind::MSPTI_ACTIVITY_KIND_API: + ProcessApiData(ReinterpretConvert(record)); + break; + case msptiActivityKind::MSPTI_ACTIVITY_KIND_COMMUNICATION: + ProcessCommunicationData(ReinterpretConvert(record)); + break; + case msptiActivityKind::MSPTI_ACTIVITY_KIND_KERNEL: + ProcessKernelData(ReinterpretConvert(record)); + break; + case msptiActivityKind::MSPTI_ACTIVITY_KIND_MARKER: + ProcessMstxData(ReinterpretConvert(record)); + break; + default: + LOG(WARNING) << record->kind << " is not supported for DBProcessManager"; + break; + } + return ErrCode::SUC; +} +} // namespace db +} // namespace ipc_monitor +} // namespace dynolog_npu diff --git a/msmonitor/plugin/ipc_monitor/db/DBProcessManager.h b/msmonitor/plugin/ipc_monitor/db/DBProcessManager.h new file mode 100644 index 000000000..100e56fda --- /dev/null +++ b/msmonitor/plugin/ipc_monitor/db/DBProcessManager.h @@ -0,0 +1,159 @@ +/* + * Copyright (C) 2025-2025. Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef IPC_MONITOR_DB_PROCESS_MANAGER_H +#define IPC_MONITOR_DB_PROCESS_MANAGER_H + +#include +#include +#include +#include "MsptiDataProcessBase.h" +#include "db/DBInfo.h" + +namespace dynolog_npu { +namespace ipc_monitor { +namespace db { +// STRING_IDS: id, value +using StringIdFormat = std::vector>; +// CANN_API: startNs, endNs, type, globalTid, connectionId, name +using APIFormat = std::vector>; +// COMMUNICATION_OP: opName, startNs, endNs, connectionId, groupName, +// opId, relay, retry, dataType, algType, count, opType +using CommunicationOpFormat = std::vector>; +// COMPUTE_TASK_INFO: name, globalTaskId, blockDim, mixBlockDim, taskType, opType, inputFormats, inputDataTypes, +// inputShapes, outputFormats, outputDataTypes, outputShapes, attrInfo, opState, hf32Eligible +using ComputeTaskInfoFormat = std::vector>; +// TASK: startNs, endNs, deviceId, connectionId, globalTaskId, +// globalPid, taskType, contextId, streamId, taskId, modelId +using TaskFormat = std::vector>; +// MSTX: startNs, endNs, eventType, rangeId, category, +// message, globalTid, endGlobalTid, domainId, connectionId +using MstxFormat = std::vector>; + +struct MstxHostData { + uint64_t connectionId; + uint64_t timestamp; + uint64_t globalTid; + uint64_t domain; + uint64_t message; +}; + +struct MstxDeviceData { + uint64_t connectionId; + uint64_t timestamp; + uint64_t globalTaskId; +}; + +class DBProcessManager : public MsptiDataProcessBase { +public: + DBProcessManager(std::string savePath) + : MsptiDataProcessBase("DBProcessManager"), savePath_(std::move(savePath)) {} + ~DBProcessManager() = default; + ErrCode ConsumeMsptiData(msptiActivity *record) override; + void SetReportInterval(uint32_t interval) override; + void RunPreTask() override; + void ExecuteTask() override; + void RunPostTask() override; + +private: + void ProcessApiData(msptiActivityApi *record); + void ProcessCommunicationData(msptiActivityCommunication *record); + void ProcessKernelData(msptiActivityKernel *record); + void ProcessMstxData(msptiActivityMarker *record); + void ProcessMstxHostData(msptiActivityMarker *record); + void ProcessMstxDeviceData(msptiActivityMarker *record); + bool CheckAndInitDB(); + bool SaveData(); + bool SaveConstantData(); + bool SaveParallelGroupData(); + bool SaveRankDeviceData(); + std::string ConstructCommOpName(const std::string &opName, const std::string &groupName); + template + bool SaveIncDataToDB(const std::vector> &data, const std::string &tableName); + +private: + uint64_t sessionStartTime_{0}; + std::string savePath_; + std::mutex dbMutex_; + DBInfo msMonitorDB_; + std::atomic reportInterval_{0}; + + std::mutex dataMutex_; + bool hasSavedData_{false}; + std::unordered_set deviceSet_; + // api data + APIFormat apiData_; + // communication data + std::atomic communicationOpId_{0}; + std::unordered_map communicationGroupOpCount_; + std::unordered_map communicationGroupNameMap_; + CommunicationOpFormat communicationOpData_; + // compute task info data + std::atomic globalTaskId_{0}; + ComputeTaskInfoFormat computeTaskInfoData_; + // task data + TaskFormat taskData_; + // mstx data + std::unordered_map mstxRangeHostDataMap_; + std::unordered_map mstxRangeDeviceDataMap_; + MstxFormat mstxData_; +}; + +template +bool InsertDataToDB(const std::vector> &data, const std::string &tableName, DBInfo &msMonitorDB) +{ + LOG(INFO) << "InsertDataToDB tableName: " << tableName; + if (data.empty()) { + LOG(WARNING) << tableName << " is empty"; + return true; + } + if (msMonitorDB.dbRunner == nullptr) { + LOG(ERROR) << "msMonitorDB dbRunner is null"; + return false; + } + if (msMonitorDB.database == nullptr) { + LOG(ERROR) << "msMonitorDB database is null"; + return false; + } + if (!msMonitorDB.dbRunner->CreateTable(tableName, msMonitorDB.database->GetTableCols(tableName))) { + LOG(ERROR) << "msMonitorDB " << tableName << " CreateTable failed"; + return false; + } + if (!msMonitorDB.dbRunner->InsertData(tableName, data)) { + LOG(ERROR) << "msMonitorDB " << tableName << " InsertData failed"; + return false; + } + return true; +} + +template +bool DBProcessManager::SaveIncDataToDB(const std::vector> &data, const std::string &tableName) +{ + if (data.empty()) { + LOG(WARNING) << tableName << " is empty"; + return true; + } + bool ret = InsertDataToDB(data, tableName, msMonitorDB_); + hasSavedData_ = hasSavedData_ || ret; + return ret; +} +} // namespace db +} // namespace ipc_monitor +} // namespace dynolog_npu +#endif // IPC_MONITOR_DB_PROCESS_MANAGER_H diff --git a/msmonitor/plugin/ipc_monitor/db/DataBase.cpp b/msmonitor/plugin/ipc_monitor/db/DataBase.cpp index f82c1d28f..a3197a5d9 100644 --- a/msmonitor/plugin/ipc_monitor/db/DataBase.cpp +++ b/msmonitor/plugin/ipc_monitor/db/DataBase.cpp @@ -26,6 +26,11 @@ const TableColumns STRING_IDS = { {"value", SQL_TEXT_TYPE} }; +const TableColumns SESSION_TIME_INFO = { + {"startTimeNs", SQL_INT_TYPE}, + {"endTimeNs", SQL_INT_TYPE} +}; + const TableColumns ENUM_TABLE = { {"id", SQL_INT_TYPE, true}, {"name", SQL_TEXT_TYPE} @@ -108,7 +113,7 @@ const TableColumns MSTX = { {"eventType", SQL_INT_TYPE}, {"rangeId", SQL_INT_TYPE}, {"category", SQL_INT_TYPE}, - {"meassge", SQL_INT_TYPE}, + {"message", SQL_INT_TYPE}, {"globalTid", SQL_INT_TYPE}, {"endGlobalTid", SQL_INT_TYPE}, {"domainId", SQL_INT_TYPE}, @@ -116,7 +121,7 @@ const TableColumns MSTX = { }; } // namespace -TableColumns Database::GetTableCols(const std::string &tableName) +const TableColumns& Database::GetTableCols(const std::string &tableName) { auto iter = tableColumns_.find(tableName); if (iter == tableColumns_.end()) { @@ -131,6 +136,7 @@ MsMonitorDB::MsMonitorDB() dbName_ = "msmonitor.db"; tableColumns_ = { {TABLE_STRING_IDS, STRING_IDS}, + {TABLE_SESSION_TIME_INFO, SESSION_TIME_INFO}, {TABLE_COMMUNICATION_OP, COMMUNICATION_OP}, {TABLE_HCCL_DATA_TYPE, ENUM_TABLE}, {TABLE_MSTX, MSTX}, diff --git a/msmonitor/plugin/ipc_monitor/db/DataBase.h b/msmonitor/plugin/ipc_monitor/db/DataBase.h index 851d67a19..2aa847016 100644 --- a/msmonitor/plugin/ipc_monitor/db/DataBase.h +++ b/msmonitor/plugin/ipc_monitor/db/DataBase.h @@ -31,7 +31,7 @@ public: virtual ~Database() = default; void SetDBName(std::string dbName) { dbName_ = std::move(dbName); } std::string GetDBName() const { return dbName_; } - TableColumns GetTableCols(const std::string &tableName); + const TableColumns& GetTableCols(const std::string &tableName); protected: std::string dbName_; std::unordered_map tableColumns_; diff --git a/msmonitor/plugin/ipc_monitor/metric/MetricManager.cpp b/msmonitor/plugin/ipc_monitor/metric/MetricManager.cpp index b5661ffa8..e2306f0ef 100644 --- a/msmonitor/plugin/ipc_monitor/metric/MetricManager.cpp +++ b/msmonitor/plugin/ipc_monitor/metric/MetricManager.cpp @@ -28,7 +28,7 @@ namespace dynolog_npu { namespace ipc_monitor { namespace metric { -MetricManager::MetricManager(): TimerTask("MetricManager", DEFAULT_FLUSH_INTERVAL), +MetricManager::MetricManager(): MsptiDataProcessBase("MetricManager"), kindSwitchs_(MSPTI_ACTIVITY_KIND_COUNT), consumeStatus_(MSPTI_ACTIVITY_KIND_COUNT) { metrics.resize(MSPTI_ACTIVITY_KIND_COUNT); metrics[MSPTI_ACTIVITY_KIND_KERNEL] = std::make_shared(); @@ -41,7 +41,7 @@ MetricManager::MetricManager(): TimerTask("MetricManager", DEFAULT_FLUSH_INTERVA metrics[MSPTI_ACTIVITY_KIND_COMMUNICATION] = std::make_shared(); } -void MetricManager::ReleaseResource() +void MetricManager::RunPostTask() { for (int i = 0; i < MSPTI_ACTIVITY_KIND_COUNT; i++) { if (kindSwitchs_[i].load()) { @@ -86,7 +86,7 @@ void MetricManager::SendMetricMsg() } } -void MetricManager::EnableKindSwitch_(msptiActivityKind kind, bool flag) +void MetricManager::EnableKindSwitch(msptiActivityKind kind, bool flag) { kindSwitchs_[kind] = flag; } diff --git a/msmonitor/plugin/ipc_monitor/metric/MetricManager.h b/msmonitor/plugin/ipc_monitor/metric/MetricManager.h index 262dc19b8..55d6aa768 100644 --- a/msmonitor/plugin/ipc_monitor/metric/MetricManager.h +++ b/msmonitor/plugin/ipc_monitor/metric/MetricManager.h @@ -18,33 +18,31 @@ #include #include - -#include "utils.h" -#include "singleton.h" -#include "mspti.h" -#include "TimerTask.h" +#include "MsptiDataProcessBase.h" #include "MetricProcessBase.h" namespace dynolog_npu { namespace ipc_monitor { namespace metric { -class MetricManager : public ipc_monitor::Singleton, public TimerTask { +class MetricManager : public MsptiDataProcessBase { public: MetricManager(); ~MetricManager() = default; - ErrCode ConsumeMsptiData(msptiActivity *record); - void SetReportInterval(uint32_t intervalTimes); - void SendMetricMsg(); + ErrCode ConsumeMsptiData(msptiActivity *record) override; + void SetReportInterval(uint32_t intervalTimes) override; void ExecuteTask() override; - void EnableKindSwitch_(msptiActivityKind kind, bool flag); - void ReleaseResource() override; + void EnableKindSwitch(msptiActivityKind kind, bool flag) override; + void RunPostTask() override; + +private: + void SendMetricMsg(); private: std::vector> kindSwitchs_; std::vector> consumeStatus_; std::atomic reportInterval_; std::vector> metrics; }; -} -} -} -#endif \ No newline at end of file +} // namespace metric +} // namespace ipc_monitor +} // namespace dynolog_npu +#endif // METRIC_MANAGER_H diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiDataProcessBase.h b/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiDataProcessBase.h new file mode 100644 index 000000000..ce91db847 --- /dev/null +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiDataProcessBase.h @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2025-2025. Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MSPTI_DATA_PROCESS_BASE_H +#define MSPTI_DATA_PROCESS_BASE_H + +#include "mspti.h" +#include "utils.h" +#include "TimerTask.h" + +namespace dynolog_npu { +namespace ipc_monitor { +class MsptiDataProcessBase : public TimerTask { +public: + explicit MsptiDataProcessBase(const std::string& name) : TimerTask(name, DEFAULT_FLUSH_INTERVAL) {} + ~MsptiDataProcessBase() = default; + void ExecuteTask() override {} + virtual void EnableKindSwitch(msptiActivityKind kind, bool flag) {} + virtual ErrCode ConsumeMsptiData(msptiActivity *record) { return ErrCode::SUC; } + virtual void SetReportInterval(uint32_t interval) {} + void SetClusterConfigData(const std::unordered_map& configData) + { + clusterConfigData = configData; + } + +protected: + std::unordered_map clusterConfigData; +}; +} // namespace ipc_monitor +} // namespace dynolog_npu +#endif // MSPTI_DATA_PROCESS_BASE_H diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.cpp index 33abb8fe3..c3d36ed2c 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.cpp +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.cpp @@ -22,6 +22,7 @@ #include "DynoLogNpuMonitor.h" #include "MetricManager.h" +#include "db/DBProcessManager.h" #include "utils.h" namespace { @@ -32,13 +33,6 @@ constexpr uint32_t MAX_ALLOC_CNT = MAX_BUFFER_SIZE / DEFAULT_BUFFER_SIZE; namespace dynolog_npu { namespace ipc_monitor { - -MsptiMonitor::MsptiMonitor() - : start_(false), - subscriber_(nullptr), - checkFlush_(false), - flushInterval_(0) {} - MsptiMonitor::~MsptiMonitor() { Uninit(); @@ -49,13 +43,27 @@ void MsptiMonitor::Start() if (start_.load()) { return; } + if (savePath_.empty()) { + std::shared_ptr metricManager{nullptr}; + MakeSharedPtr(metricManager); + dataProcessor_ = metricManager; + } else { + std::shared_ptr dbProcessManager{nullptr}; + MakeSharedPtr(dbProcessManager, savePath_); + dataProcessor_ = dbProcessManager; + } + if (dataProcessor_ == nullptr) { + LOG(ERROR) << "MsptiMonitor Start failed, dataProcessor init failed"; + return; + } SetThreadName("MsptiMonitor"); if (Thread::Start() != 0) { LOG(ERROR) << "MsptiMonitor start failed"; return; } start_.store(true); - metric::MetricManager::GetInstance()->Run(); + dataProcessor_->SetReportInterval(flushInterval_); + dataProcessor_->Run(); LOG(INFO) << "MsptiMonitor start successfully"; } @@ -65,10 +73,11 @@ void MsptiMonitor::Stop() LOG(WARNING) << "MsptiMonitor is not running"; return; } - Uninit(); + if (msptiActivityFlushAll(1) != MSPTI_SUCCESS) { LOG(WARNING) << "MsptiMonitor stop msptiActivityFlushAll failed"; } + Uninit(); LOG(INFO) << "MsptiMonitor stop successfully"; } @@ -77,10 +86,34 @@ void MsptiMonitor::Uninit() if (!start_.load()) { return; } - metric::MetricManager::GetInstance()->Stop(); start_.store(false); cv_.notify_one(); Thread::Stop(); + if (dataProcessor_ != nullptr) { + dataProcessor_->Stop(); + dataProcessor_ = nullptr; + } + savePath_.clear(); +} + +bool MsptiMonitor::CheckAndSetSavePath(const std::string &path) +{ + if (path.empty()) { + LOG(ERROR) << "MsptiMonitor CheckAndSetSavePath failed, path is empty"; + return false; + } + std::string absPath = PathUtils::RelativeToAbsPath(path); + if (PathUtils::DirPathCheck(absPath)) { + std::string realPath = PathUtils::RealPath(absPath); + if (PathUtils::CreateDir(realPath)) { + savePath_ = realPath; + return true; + } + LOG(ERROR) << "MsptiMonitor CheckAndSetSavePath failed, Create save path: " << realPath << " failed."; + } else { + LOG(ERROR) << "MsptiMonitor CheckAndSetSavePath failed, save path: " << absPath << " is invalid."; + } + return false; } void MsptiMonitor::EnableActivity(msptiActivityKind kind) @@ -92,7 +125,9 @@ void MsptiMonitor::EnableActivity(msptiActivityKind kind) } else { LOG(ERROR) << "MsptiMonitor enableActivity failed, kind: " << static_cast(kind); } - metric::MetricManager::GetInstance()->EnableKindSwitch_(kind, true); + if (dataProcessor_ != nullptr) { + dataProcessor_->EnableKindSwitch(kind, true); + } } } @@ -105,7 +140,9 @@ void MsptiMonitor::DisableActivity(msptiActivityKind kind) } else { LOG(ERROR) << "MsptiMonitor disableActivity failed, kind: " << static_cast(kind); } - metric::MetricManager::GetInstance()->EnableKindSwitch_(kind, false); + if (dataProcessor_ != nullptr) { + dataProcessor_->EnableKindSwitch(kind, false); + } } } @@ -116,7 +153,9 @@ void MsptiMonitor::SetFlushInterval(uint32_t interval) if (start_.load()) { cv_.notify_one(); } - metric::MetricManager::GetInstance()->SetReportInterval(interval); + if (dataProcessor_ != nullptr) { + dataProcessor_->SetReportInterval(interval); + } } bool MsptiMonitor::IsStarted() @@ -227,7 +266,22 @@ void MsptiMonitor::BufferConsume(msptiActivity *record) if (record == nullptr) { return; } - metric::MetricManager::GetInstance()->ConsumeMsptiData(record); + auto dataProcessor = GetDataProcessor(); + if (dataProcessor != nullptr) { + dataProcessor->ConsumeMsptiData(record); + } +} + +std::shared_ptr MsptiMonitor::GetDataProcessor() +{ + return GetInstance()->dataProcessor_; +} + +void MsptiMonitor::SetClusterConfigData(const std::unordered_map& configData) +{ + if (dataProcessor_ != nullptr) { + dataProcessor_->SetClusterConfigData(configData); + } } } // namespace ipc_monitor } // namespace dynolog_npu diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.h b/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.h index d1b73e581..5e988948f 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.h +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.h @@ -22,13 +22,14 @@ #include #include "mspti.h" #include "thread.h" +#include "singleton.h" +#include "MsptiDataProcessBase.h" namespace dynolog_npu { namespace ipc_monitor { -class MsptiMonitor : public Thread { +class MsptiMonitor : public Singleton, public Thread { public: - explicit MsptiMonitor(); virtual ~MsptiMonitor(); void Start(); void Stop(); @@ -38,25 +39,30 @@ public: bool IsStarted(); std::set GetEnabledActivities(); void Uninit(); + bool CheckAndSetSavePath(const std::string& path); + void SetClusterConfigData(const std::unordered_map& configData); private: static void BufferRequest(uint8_t **buffer, size_t *size, size_t *maxNumRecords); static void BufferComplete(uint8_t *buffer, size_t size, size_t validSize); static void BufferConsume(msptiActivity *record); + static std::shared_ptr GetDataProcessor(); static std::atomic allocCnt; private: void Run() override; private: - std::atomic start_; + std::atomic start_{false}; std::mutex cvMtx_; std::condition_variable cv_; - msptiSubscriberHandle subscriber_; + msptiSubscriberHandle subscriber_{nullptr}; std::mutex activityMtx_; std::set enabledActivities_; - std::atomic checkFlush_; - std::atomic flushInterval_; + std::atomic checkFlush_{false}; + std::atomic flushInterval_{0}; + std::string savePath_; + std::shared_ptr dataProcessor_{nullptr}; }; } // namespace ipc_monitor } // namespace dynolog_npu diff --git a/msmonitor/plugin/ipc_monitor/utils.cpp b/msmonitor/plugin/ipc_monitor/utils.cpp index e7278fda9..7c5c59249 100644 --- a/msmonitor/plugin/ipc_monitor/utils.cpp +++ b/msmonitor/plugin/ipc_monitor/utils.cpp @@ -91,9 +91,8 @@ std::string getCurrentTimestamp() uint64_t getCurrentTimestamp64() { auto now = std::chrono::system_clock::now(); - auto micros = std::chrono::duration_cast(now.time_since_epoch()); - auto milli_time = std::chrono::duration_cast(micros).count(); - return milli_time; + auto ns = std::chrono::duration_cast(now.time_since_epoch()); + return ns.count(); } std::string formatErrorCode(SubModule submodule, ErrCode errorCode) diff --git "a/\345\205\254\347\275\221URL\350\257\264\346\230\216.md" "b/\345\205\254\347\275\221URL\350\257\264\346\230\216.md" index 9dc294280..ff1792324 100644 --- "a/\345\205\254\347\275\221URL\350\257\264\346\230\216.md" +++ "b/\345\205\254\347\275\221URL\350\257\264\346\230\216.md" @@ -5,7 +5,10 @@ | 开源软件 | MindStudio Training Tools - msmonitor | /.gitmodules | 公网地址 | https://github.com/facebookincubator/dynolog.git | 在线监控底座 | | 开源软件 | MindStudio Training Tools - msmonitor | /msmonitor/dynolog_npu/cmake/config.ini | 公网地址 | https://gitee.com/mirrors/openssl.git | 开源软件下载 | | 开源软件 | MindStudio Training Tools - msmonitor | /msmonitor/scripts/build.sh | 公网地址 | https://github.com/RustingSword/tensorboard_logger.git | 开源软件下载 | -| 开源软件 | MindStudio Training Tools - msmonitor | /msmonitor/README.md | 公网地址 | https://github.com/tensorflow/tensorboard | tensorboard官网教程 | +| 开源软件 | MindStudio Training Tools - msmonitor | /msmonitor/docs/npumonitor.md | 公网地址 | https://github.com/tensorflow/tensorboard | tensorboard官网教程 | +| 开源软件 | MindStudio Training Tools - msmonitor | /msmonitor/plugin/cmake/config.ini | 公网地址 | https://gitee.com/mirrors/glog.git | 开源软件下载 | +| 开源软件 | MindStudio Training Tools - msmonitor | /msmonitor/plugin/cmake/config.ini | 公网地址 | https://gitee.com/mirrors/nlohmann-json.git | 开源软件下载 | +| 开源软件 | MindStudio Training Tools - msmonitor | /msmonitor/plugin/cmake/config.ini | 公网地址 | https://sqlite.org/2025/sqlite-amalgamation-3500300.zip | 开源软件下载 | | 开源软件 | MindStudio Training Tools - msprof-analyze advisor | /profiler/msprof_analyze/advisor/config/config.ini | 公网地址 | https://www.hiascend.com/document/detail/zh/canncommercial/80RC2/devaids/auxiliarydevtool/atlasprofiling_16_0038.html | MindStudio Ascend PyTorch Profiler参考示例 | | 开源软件 | MindStudio Training Tools - msprof-analyze advisor | /profiler/msprof_analyze/advisor/config/config.ini | 公网地址 | https://gitee.com/ascend/mstt/blob/master/profiler/msprof_analyze/advisor/doc/Samples%20of%20Fused%20Operator%20API%20Replacement.md" | Advisor优化手段参考示例 | | 开源软件 | MindStudio Training Tools - msprof-analyze advisor | /profiler/msprof_analyze/advisor/config/config.ini | 公网地址 | https://www.hiascend.com/document/detail/zh/canncommercial/80RC2/devaids/auxiliarydevtool/aoe_16_043.html | Advisor优化手段参考示例 | -- Gitee