Source code for perun.processing

"""Processing Module."""

import copy
import logging
from configparser import ConfigParser
from datetime import datetime
from itertools import chain
from typing import Any, Dict, List, Optional, Tuple

import numpy as np

from perun.data_model.data import (
    AggregateType,
    DataNode,
    Metric,
    MetricType,
    NodeType,
    RawData,
    Region,
    Stats,
)
from perun.data_model.measurement_type import Magnitude, MetricMetaData, Number, Unit
from perun.data_model.sensor import DeviceType

log = logging.getLogger(__name__)


[docs] def processEnergyData( raw_data: RawData, start: Optional[Number] = None, end: Optional[Number] = None, ) -> Tuple[Any, Any]: """Calculate total energy and average power from an energy or power time series. Using the start and end parameters the results can be limited to certain areas of the application run. Parameters ---------- raw_data : RawData Raw Data from sensor start : Optional[Number], optional Start time of region, by default None end : Optional[Number], optional End time of region, by default None Returns ------- _type_ Tuple with total energy in joules and avg power in watts. """ t_s: np.ndarray = raw_data.timesteps.astype("float32") t_s *= raw_data.t_md.mag.value / Magnitude.ONE.value magFactor = raw_data.v_md.mag.value / Magnitude.ONE.value if raw_data.v_md.unit == Unit.JOULE: # If getting energy, transform to power e_J = raw_data.values maxValue = raw_data.v_md.max dtype = raw_data.v_md.dtype.name d_energy = np.diff(e_J) if "uint" in dtype: idx = d_energy >= maxValue max_dtype = np.iinfo(dtype).max d_energy[idx] = maxValue + d_energy[idx] - max_dtype else: idx = d_energy <= 0 d_energy[idx] = d_energy[idx] + maxValue d_energy = d_energy.astype("float32") # Transform the energy series to a power series power_W = d_energy / np.diff(t_s) power_W = np.insert(power_W, 0, power_W[0]) power_W *= magFactor raw_data.alt_values = e_J raw_data.alt_v_md = raw_data.v_md raw_data.values = power_W raw_data.v_md = MetricMetaData( Unit.WATT, Magnitude.ONE, np.dtype("float32"), np.float32(0), np.finfo("float32").max, np.float32(-1), ) elif raw_data.v_md.unit == Unit.WATT: power_W = raw_data.values.astype("float32") * magFactor if start and end: t_s, power_W = getInterpolatedValues(t_s, power_W, start, end) avg_power_W = np.mean(power_W) if np.__version__[0] == "1": energy_J = np.trapz(power_W, x=t_s) else: energy_J = np.trapezoid(power_W, x=t_s) return energy_J, avg_power_W
[docs] def processSensorData(sensorData: DataNode) -> DataNode: """Calculate metrics based on raw values. Parameters ---------- sensorData : DataNode DataNode with raw sensor data. Returns ------- DataNode DataNode with computed metrics. """ if sensorData.type == NodeType.SENSOR and sensorData.raw_data: rawData = sensorData.raw_data runtime: float = rawData.timesteps[-1].item() sensorData.metrics[MetricType.RUNTIME] = Metric( MetricType.RUNTIME, runtime, rawData.t_md, AggregateType.MAX ) if rawData.v_md.unit == Unit.JOULE or rawData.v_md.unit == Unit.WATT: energy_J, power_W = processEnergyData(rawData) energyMetric = Metric( MetricType.ENERGY, energy_J, MetricMetaData( Unit.JOULE, Magnitude.ONE, np.dtype("float32"), np.float32(0), np.finfo("float32").max, np.float32(-1), ), AggregateType.SUM, ) powerMetric = Metric( MetricType.POWER, power_W, MetricMetaData( Unit.WATT, Magnitude.ONE, np.dtype("float32"), np.float32(0), np.finfo("float32").max, np.float32(-1), ), AggregateType.SUM, ) sensorData.metrics[MetricType.ENERGY] = energyMetric sensorData.metrics[MetricType.POWER] = powerMetric if sensorData.deviceType == DeviceType.CPU: sensorData.metrics[MetricType.CPU_ENERGY] = energyMetric.copy() sensorData.metrics[MetricType.CPU_ENERGY].type = MetricType.CPU_ENERGY sensorData.metrics[MetricType.CPU_POWER] = powerMetric.copy() sensorData.metrics[MetricType.CPU_POWER].type = MetricType.CPU_POWER elif sensorData.deviceType == DeviceType.GPU: sensorData.metrics[MetricType.GPU_ENERGY] = energyMetric.copy() sensorData.metrics[MetricType.GPU_ENERGY].type = MetricType.GPU_ENERGY sensorData.metrics[MetricType.GPU_POWER] = powerMetric.copy() sensorData.metrics[MetricType.GPU_POWER].type = MetricType.GPU_POWER elif sensorData.deviceType == DeviceType.RAM: sensorData.metrics[MetricType.DRAM_ENERGY] = energyMetric.copy() sensorData.metrics[MetricType.DRAM_ENERGY].type = MetricType.DRAM_ENERGY sensorData.metrics[MetricType.DRAM_POWER] = powerMetric.copy() sensorData.metrics[MetricType.DRAM_POWER].type = MetricType.DRAM_POWER elif sensorData.deviceType == DeviceType.OTHER: sensorData.metrics[MetricType.OTHER_ENERGY] = energyMetric.copy() sensorData.metrics[ MetricType.OTHER_ENERGY ].type = MetricType.OTHER_ENERGY sensorData.metrics[MetricType.OTHER_POWER] = powerMetric.copy() sensorData.metrics[MetricType.OTHER_POWER].type = MetricType.OTHER_POWER elif rawData.v_md.unit == Unit.PERCENT: if sensorData.deviceType == DeviceType.CPU: metricType = MetricType.CPU_UTIL elif sensorData.deviceType == DeviceType.GPU: metricType = MetricType.GPU_UTIL else: metricType = MetricType.OTHER_UTIL sensorData.metrics[metricType] = Metric( metricType, np.mean(rawData.values), rawData.v_md, AggregateType.MEAN, ) elif rawData.v_md.unit == Unit.BYTE: bytes_v = rawData.values if sensorData.deviceType == DeviceType.NET: if "READ" in sensorData.id: metricType = MetricType.NET_READ else: metricType = MetricType.NET_WRITE d_bytes = bytes_v[1:] - bytes_v[:-1] result = d_bytes.sum() aggType = AggregateType.SUM elif sensorData.deviceType == DeviceType.DISK: if "READ" in sensorData.id: metricType = MetricType.DISK_READ else: metricType = MetricType.DISK_WRITE d_bytes = bytes_v[1:] - bytes_v[:-1] result = d_bytes.sum() aggType = AggregateType.SUM elif sensorData.deviceType == DeviceType.GPU: metricType = MetricType.GPU_MEM result = bytes_v.mean() aggType = AggregateType.SUM elif sensorData.deviceType == DeviceType.RAM: metricType = MetricType.DRAM_MEM result = bytes_v.mean() aggType = AggregateType.SUM else: metricType = MetricType.OTHER_MEM result = bytes_v.mean() aggType = AggregateType.SUM sensorData.metrics[metricType] = Metric( metricType, result.astype(rawData.v_md.dtype), rawData.v_md, aggType ) sensorData.processed = True return sensorData
[docs] def processDataNode( dataNode: DataNode, perunConfig: ConfigParser, force_process: bool = False ) -> DataNode: """Recursively calculate metrics on the dataNode tree. Parameters ---------- dataNode : DataNode Root data node tree. perunConfig: ConfigParser Perun configuration force_process : bool, optional Force recomputation of child node metrics, by default False Returns ------- DataNode Data node with computed metrics. """ # Regions if dataNode.regions: start = datetime.now() unprocessedRegions = [] for region in dataNode.regions.values(): if not region.processed: addRunAndRuntimeInfoToRegion(region) region.processed = True unprocessedRegions.append(region) processRegionsWithSensorData(unprocessedRegions, dataNode) duration = datetime.now() - start log.info(f"Region processing duration: {duration}") aggregatedMetrics: Dict[MetricType, List[Metric]] = {} for _, subNode in dataNode.nodes.items(): # Make sure sub nodes have their metrics ready if not subNode.processed or force_process: if subNode.type == NodeType.SENSOR: subNode = processSensorData(subNode) else: subNode = processDataNode( subNode, perunConfig=perunConfig, force_process=force_process ) if dataNode.type == NodeType.APP: for subSubNode in subNode.nodes.values(): for metricType, metric in subSubNode.metrics.items(): if isinstance(metric, Metric): if metricType in aggregatedMetrics: aggregatedMetrics[metricType].append(metric) else: aggregatedMetrics[metricType] = [metric] else: for metricType, metric in subNode.metrics.items(): if isinstance(metric, Metric): if metricType in aggregatedMetrics: aggregatedMetrics[metricType].append(metric) else: aggregatedMetrics[metricType] = [metric] for metricType, metrics in aggregatedMetrics.items(): aggType = metrics[0].agg metric_md = metrics[0].metric_md if dataNode.type == NodeType.MULTI_RUN or dataNode.type == NodeType.APP: dataNode.metrics[metricType] = Stats.fromMetrics(metrics) else: if aggType == AggregateType.MEAN: aggregatedValue = np.array([metric.value for metric in metrics]).mean() elif aggType == AggregateType.MAX: aggregatedValue = np.array([metric.value for metric in metrics]).max() elif aggType == AggregateType.MIN: aggregatedValue = np.array([metric.value for metric in metrics]).min() else: aggregatedValue = np.array([metric.value for metric in metrics]).sum() dataNode.metrics[metricType] = Metric( metricType, aggregatedValue, metric_md, aggType ) # Apply power overhead to each computational node if there is power data available. if dataNode.type == NodeType.NODE and MetricType.POWER in dataNode.metrics: power_overhead = perunConfig.getfloat("post-processing", "power_overhead") dataNode.metrics[MetricType.POWER].value += power_overhead runtime = dataNode.metrics[MetricType.RUNTIME].value dataNode.metrics[MetricType.ENERGY].value += runtime * power_overhead # If there is energy data, apply PUE, and convert to currency and CO2 emmisions. if dataNode.type == NodeType.RUN and MetricType.ENERGY in dataNode.metrics: pue = perunConfig.getfloat("post-processing", "pue") emissions_factor = perunConfig.getfloat("post-processing", "emissions_factor") price_factor = perunConfig.getfloat("post-processing", "price_factor") total_energy = dataNode.metrics[MetricType.ENERGY].value * pue dataNode.metrics[MetricType.ENERGY].value = total_energy e_kWh = total_energy / (3600 * 1e3) costMetric = Metric( MetricType.MONEY, e_kWh * price_factor, MetricMetaData( Unit.SCALAR, Magnitude.ONE, np.dtype("float32"), np.float32(0), np.finfo("float32").max, np.float32(0), ), AggregateType.SUM, ) co2Emissions = Metric( MetricType.CO2, e_kWh * emissions_factor, MetricMetaData( Unit.GRAM, Magnitude.ONE, np.dtype("float32"), np.float32(0), np.finfo("float32").max, np.float32(0), ), AggregateType.SUM, ) dataNode.metrics[MetricType.MONEY] = costMetric dataNode.metrics[MetricType.CO2] = co2Emissions dataNode.processed = True return dataNode
[docs] def processRegionsWithSensorData(regions: List[Region], dataNode: DataNode) -> None: """Complete region information using sensor data found on the data node (in place op). Parameters ---------- regions : List[Region] List of regions that use the same data node. dataNode : DataNode Data node with sensor data. """ log.debug(f"Processing regions with sensor data: {len(regions)}") power = [ { rank: [0.0 for _ in range(region.raw_data[rank].shape[0] // 2)] for rank in region.raw_data.keys() } for region in regions ] cpu_util = copy.deepcopy(power) dram_mem = copy.deepcopy(power) gpu_mem = copy.deepcopy(power) has_gpu = False for hostNode in dataNode.nodes.values(): # Get relevant ranks ranks = hostNode.metadata["mpi_ranks"] for deviceNode in hostNode.nodes.values(): if ( deviceNode.deviceType == DeviceType.CPU or deviceNode.deviceType == DeviceType.GPU or deviceNode.deviceType == DeviceType.RAM ): for sensorNode in deviceNode.nodes.values(): if sensorNode.raw_data: raw_data = sensorNode.raw_data measuring_unit = raw_data.v_md.unit for region_idx, region in enumerate(regions): for rank in ranks: if rank in region.raw_data: events = region.raw_data[rank] for i in range(events.shape[0] // 2): if ( measuring_unit == Unit.JOULE or measuring_unit == Unit.WATT ): _, power_W = processEnergyData( raw_data, events[i * 2].item(), events[i * 2 + 1].item(), ) power[region_idx][rank][i] += power_W elif ( measuring_unit == Unit.PERCENT and deviceNode.deviceType == DeviceType.CPU ): _, values = getInterpolatedValues( raw_data.timesteps.astype("float32"), raw_data.values, events[i * 2].item(), events[i * 2 + 1].item(), ) cpu_util[region_idx][rank][i] += np.mean( values, dtype="float32" ) elif ( measuring_unit == Unit.BYTE and deviceNode.deviceType == DeviceType.RAM ): _, values = getInterpolatedValues( raw_data.timesteps.astype("float32"), raw_data.values, events[i * 2].item(), events[i * 2 + 1].item(), ) dram_mem[region_idx][rank][i] += ( np.mean(values) ).astype("float32") elif ( measuring_unit == Unit.BYTE and deviceNode.deviceType == DeviceType.GPU ): has_gpu = True _, values = getInterpolatedValues( raw_data.timesteps.astype("float32"), raw_data.values, events[i * 2].item(), events[i * 2 + 1].item(), ) gpu_mem[region_idx][rank][i] += ( np.mean(values) ).astype("float32") for region_idx, region in enumerate(regions): r_power = np.array(list(chain(*power[region_idx].values()))) r_cpu_util = np.array(list(chain(*cpu_util[region_idx].values()))) r_gpu_mem = np.array(list(chain(*gpu_mem[region_idx].values()))) r_dram_mem = np.array(list(chain(*dram_mem[region_idx].values()))) region.metrics[MetricType.CPU_UTIL] = Stats( MetricType.CPU_UTIL, MetricMetaData( Unit.PERCENT, Magnitude.ONE, np.dtype("float32"), np.float32(0), np.float32(100), np.float32(-1), ), r_cpu_util.sum(), r_cpu_util.mean(), r_cpu_util.std(), r_cpu_util.max(), r_cpu_util.min(), ) region.metrics[MetricType.DRAM_MEM] = Stats( MetricType.DRAM_MEM, MetricMetaData( Unit.BYTE, Magnitude.ONE, np.dtype("uint64"), np.uint64(0), np.uint64(np.iinfo("uint64").max), np.uint64(0), ), r_dram_mem.sum(), r_dram_mem.mean(), r_dram_mem.std(), r_dram_mem.max(), r_dram_mem.min(), ) region.metrics[MetricType.POWER] = Stats( MetricType.POWER, MetricMetaData( Unit.WATT, Magnitude.ONE, np.dtype("float32"), np.float32(0), np.float32(np.finfo("float32").max), np.float32(-1), ), r_power.sum(), r_power.mean(), r_power.std(), r_power.max(), r_power.min(), ) if has_gpu: region.metrics[MetricType.GPU_MEM] = Stats( MetricType.GPU_MEM, MetricMetaData( Unit.BYTE, Magnitude.ONE, np.dtype("uint64"), np.uint64(0), np.uint64(np.iinfo("uint64").max), np.uint64(0), ), r_gpu_mem.sum(), r_gpu_mem.mean(), r_gpu_mem.std(), r_gpu_mem.max(), r_gpu_mem.min(), )
[docs] def addRunAndRuntimeInfoToRegion(region: Region) -> None: """Process run and runtime stats in region objects (in place operation). Parameters ---------- region : Region Region object """ runs_per_rank = [] runtime = [] for rank in region.raw_data.keys(): events = region.raw_data[rank] runs_per_rank.append(events.shape[0] / 2) for i in range(1, events.shape[0], 2): runtime.append(events[i] - events[i - 1]) runs_array = np.array(runs_per_rank) runtime_array = np.array(runtime) region.runs_per_rank = Stats( MetricType.N_RUNS, MetricMetaData( Unit.SCALAR, Magnitude.ONE, np.dtype("float32"), np.float32(0), np.finfo("float32").max, np.float32(-1), ), runs_array.sum(), runs_array.mean(), runs_array.std(), runs_array.max(), runs_array.min(), ) region.metrics[MetricType.RUNTIME] = Stats( MetricType.RUNTIME, MetricMetaData( Unit.SECOND, Magnitude.ONE, np.dtype("float32"), np.float32(0), np.finfo("float32").max, np.float32(-1), ), runtime_array.sum(), runtime_array.mean(), runtime_array.std(), runtime_array.max(), runtime_array.min(), )
[docs] def getInterpolatedValues( t: np.ndarray, x: np.ndarray, start: Number, end: Number ) -> Tuple[np.ndarray, np.ndarray]: """Extract a time range out of a time series, and interpolate the values at the edges. Parameters ---------- t : np.ndarray Original time steps x : np.ndarray Original values start : Number Start of the roi end : Number End of the roi Returns ------- Tuple[np.ndarray, np.ndarray] Tuple with the new time steps and values. """ new_t = np.concatenate([[start], t[np.all([t >= start, t <= end], axis=0)], [end]]) new_x = np.interp(new_t, t, x) return new_t, new_x