-
Notifications
You must be signed in to change notification settings - Fork 101
Expand file tree
/
Copy pathplot_metrics.py
More file actions
144 lines (118 loc) · 4.46 KB
/
plot_metrics.py
File metadata and controls
144 lines (118 loc) · 4.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import pandas as pd
import json
from plotnine import *
def get_data_samples(file_path):
"""
Reads an NDJSON file and returns a list of JSON objects.
:param file_path: Path to the NDJSON file
:return: List of JSON objects
"""
json_list = []
with open(file_path, "r", encoding="utf-8") as file:
for line in file:
json_list.append(json.loads(line.strip())) # Parse each line as JSON
return json_list
# Function to process the data
def parse_data(data_samples):
records = []
for timestamp, sample in enumerate(data_samples):
for entry in sample:
key = entry["key"]
value = entry["value"].get("Gauge") or entry["value"].get(
"Counter"
) # Handle Gauge and Counter values
labels = {
label[0]: label[1] for label in entry.get("labels", [])
} # Convert labels to dict
# Extract necessary information
worker = labels.get("worker", "Total")
level = labels.get("level", "Total")
spine = labels.get("id", "Total")
# Append processed entry
records.append(
{
"timestamp": timestamp,
"key": key,
"worker": worker,
"level": level,
"value": value,
"id": spine,
}
)
return pd.DataFrame(records)
def make_plots(data_samples):
# Convert the data
df = parse_data(data_samples)
# Filter for specific metrics
df_merges = df[df["key"] == "spine.ongoing_merges"]
df_merges_summary = (
df_merges.groupby(["timestamp", "level"])["value"]
.agg(["mean", "max", "min"])
.reset_index()
.melt(id_vars=["timestamp", "level"], var_name="stat", value_name="value")
)
df_batches = df[df["key"] == "spine.batches_per_level"]
df_batches_summary = (
df_batches.groupby(["timestamp", "level"])["value"]
.agg(["mean", "max", "min"])
.reset_index()
.melt(id_vars=["timestamp", "level"], var_name="stat", value_name="value")
)
# Get bytes written at every time
df_disk = df[df["key"] == "disk.total_bytes_written"]
df_disk = df_disk.sort_values("timestamp")
df_disk["value"] = df_disk["value"].diff() / (1024 * 1024)
df_disk = df_disk.dropna()
df_disk["metric"] = "Writes MiB/s"
# Aggregate total values per timestamp
df_merges_total = df_merges.groupby("timestamp")["value"].sum().reset_index()
df_merges_total["worker"] = "Total"
df_merges_total["level"] = "Total"
df_merges_total["metric"] = "Current #Batches being merged"
df_batches_total = df_batches.groupby("timestamp")["value"].sum().reset_index()
df_batches_total["worker"] = "Total"
df_batches_total["level"] = "Total"
df_batches_total["metric"] = "Current #Batches not being merged"
# Merge with original to include total lines
df_merges = pd.concat([df_merges_summary])
df_batches = pd.concat([df_batches_summary])
df_totals = pd.concat([df_disk, df_merges_total, df_batches_total])
# Plot function
def create_plot(df, title, filename):
plot = (
ggplot(df, aes(x="timestamp", y="value", color="stat", group="stat"))
+ geom_line(size=1)
+ facet_wrap("~level", scales="free")
+ labs(title=title, x="Time", y="Value")
+ theme_classic()
+ scale_y_continuous(limits=(0, None))
)
plot.save(filename, width=12, height=6, dpi=300)
print(f"Saved {filename}")
# Generate plots
create_plot(
df_merges,
"Current #Batches being merged (Min/Avg/Max from all Spines)",
"ongoing_merges.png",
)
create_plot(
df_batches,
"Current #Batches not being merged (Min/Avg/Max from all Spine)",
"batches_per_level.png",
)
plot = (
ggplot(df_totals, aes(x="timestamp", y="value", color="metric"))
+ geom_line(size=1)
+ facet_grid(
"metric ~ .", scales="free_y"
) # Separate plots for MiB/s and Counts
+ labs(title="Pipeline Totals", x="Time", y="Value")
+ theme_classic()
+ scale_y_continuous(limits=(0, None))
)
# Save the plot
plot.save("pipeline_totals.png", width=12, height=8, dpi=300)
if __name__ == "__main__":
import sys
samples = get_data_samples(sys.argv[1])
make_plots(samples)