]> begriffs open source - cmsis-freertos/blob - Test/litani/lib/graph.py
Update README.md - branch main is now the base branch
[cmsis-freertos] / Test / litani / lib / graph.py
1 # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License").
4 # You may not use this file except in compliance with the License.
5 # A copy of the License is located at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # or in the "license" file accompanying this file. This file is distributed
10 # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11 # express or implied. See the License for the specific language governing
12 # permissions and limitations under the License.
13
14
15 import dataclasses
16 import os
17 import pathlib
18 import re
19 import textwrap
20
21 import lib.litani
22 import lib.litani_report
23
24
25
26 class Node:
27     @staticmethod
28     def escape(string):
29         for match, repl in [(
30             '"', '\\"'
31         ), (
32             ';', '\\;'
33         )]:
34             string = re.sub(match, repl, string)
35         return string
36
37
38     def html_escape(string):
39         for match, repl in [(
40             '&', '&'
41         ), (
42             '"', '"'
43         ), (
44             '<', '&lt;'
45         ), (
46             '>', '&gt;'
47         )]:
48             string = re.sub(match, repl, string)
49         return string
50
51
52 class DependencyNode(Node):
53     def __init__(self, fyle, line_width=40, **style):
54         self.file = fyle
55         self.id = hash(fyle)
56
57         path = pathlib.Path(fyle)
58         _, ext = os.path.splitext(path.name)
59
60         self.style = style
61
62         path_name = "\n".join(textwrap.wrap(path.name, width=line_width))
63         self.style["label"] = f"{path_name}{ext}"
64
65
66     def __hash__(self):
67         return self.id
68
69
70     def __eq__(self, other):
71         return self.id == other.id
72
73
74     def __str__(self):
75         return '"{id}" [{style}];'.format(
76             id=self.id, style=",".join([
77                 f'"{key}"="{Node.escape(value)}"'
78                 for key, value in self.style.items()]))
79
80
81
82 class CommandNode(Node):
83     def __init__(
84             self, pipeline_name, description, command, line_width=40, **style):
85         self.id = hash(command)
86         self.pipeline_name = '<BR/>'.join(
87             textwrap.wrap(Node.html_escape(pipeline_name), width=line_width))
88         if description:
89             self.description = '<BR/>'.join(
90                 textwrap.wrap(Node.html_escape(description), width=line_width))
91         else:
92             self.description = ""
93         self.command = '<BR/>'.join(
94             textwrap.wrap(Node.html_escape(command), width=line_width))
95         self.style = style
96
97         self.style["shape"] = "plain"
98
99
100
101     def __hash__(self):
102         return self.id
103
104
105     def __eq__(self, other):
106         return self.id == other.id
107
108
109     def __str__(self):
110         if self.description:
111             desc_cell = f"\n<TD><B>{self.description}</B></TD>"
112         else:
113             desc_cell = ""
114         return '''"{id}" [label=<
115             <TABLE BORDER="0" CELLBORDER="1" CELLSPACING="0">
116                 <TR>
117                     <TD><B>{pipeline_name}</B></TD>{desc_cell}
118                 </TR>
119                 <TR>
120                     <TD COLSPAN="2">{command}</TD>
121                 </TR>
122             </TABLE>> {style}];'''.format(
123                 id=self.id, command=self.command,
124                 desc_cell=desc_cell,
125                 pipeline_name=self.pipeline_name,
126                 style=",".join([
127                     f'{key}="{Node.escape(value)}"'
128                     for key, value in self.style.items()]))
129
130
131
132 class Edge:
133     def __init__(self, src, dst, **style):
134         self.src = src
135         self.dst = dst
136         self.style = style
137
138     def __eq__(self, other):
139         return all((self.src == other.src, self.dst == other.dst))
140
141
142     def __hash__(self):
143         return hash(self.src) + hash(self.dst)
144
145
146     def __str__(self):
147         return '"{src}" -> "{dst}" [{style}];'.format(
148             src=self.src.id, dst=self.dst.id,
149             style=",".join([
150                 f'"{key}"="{value}"' for key, value in self.style.items()]))
151
152
153
154 @dataclasses.dataclass
155 class SinglePipelineGraph:
156     pipe: dict
157     nodes: set = dataclasses.field(default_factory=set)
158     edges: set = dataclasses.field(default_factory=set)
159
160
161     def iter_jobs(self):
162         for stage in self.pipe["ci_stages"]:
163             for job in stage["jobs"]:
164                 yield job
165
166
167     def build(self):
168         for job in self.iter_jobs():
169             args = job["wrapper_arguments"]
170             cmd_node = self._make_cmd_node(
171                 job["complete"], job.get("outcome", None),
172                 args["pipeline_name"], args["description"], args["command"])
173             self.nodes.add(cmd_node)
174
175             for inputt in args.get("inputs") or []:
176                 in_node = lib.graph.DependencyNode(inputt)
177                 self.nodes.add(in_node)
178                 self.edges.add(lib.graph.Edge(src=in_node, dst=cmd_node))
179
180             for output in args.get("outputs") or []:
181                 out_node = lib.graph.DependencyNode(output)
182                 self.nodes.add(out_node)
183                 self.edges.add(lib.graph.Edge(src=cmd_node, dst=out_node))
184
185
186     @staticmethod
187     def _make_cmd_node(complete, outcome, pipeline_name, description, command):
188         cmd_style = {"style": "filled"}
189         if complete and outcome == "success":
190             cmd_style["fillcolor"] = "#90caf9"
191         elif complete and outcome == "fail_ignored":
192             cmd_style["fillcolor"] = "#ffecb3"
193         elif complete and outcome == "fail":
194             cmd_style["fillcolor"] = "#ef9a9a"
195         elif complete:
196             raise RuntimeError("Unknown outcome '%s'" % outcome)
197         else:
198             cmd_style["fillcolor"] = "#eceff1"
199         return lib.graph.CommandNode(
200             pipeline_name, description, command, **cmd_style)
201
202
203     def as_dot(self):
204         buf = ["digraph G {"]
205         buf.append('bgcolor="transparent"')
206         buf.extend([("  %s" % str(n)) for n in self.nodes])
207         buf.extend([("  %s" % str(e)) for e in self.edges])
208         buf.append("}")
209         return "\n".join(buf)
210
211
212     @staticmethod
213     def render(pipe):
214         spg = SinglePipelineGraph(pipe)
215         spg.build()
216         return spg.as_dot()
217
218
219
220 @dataclasses.dataclass
221 class PipelineChooser:
222     pipelines: list
223
224
225     def should_skip(self, pipeline):
226         return self.pipelines and not pipeline in self.pipelines
227
228
229
230 @dataclasses.dataclass
231 class Graph:
232     run: dict
233     pipeline_chooser: PipelineChooser
234
235
236     def iter_jobs(self):
237         for pipe in self.run["pipelines"]:
238             if self.pipeline_chooser.should_skip(pipe["name"]):
239                 continue
240             for stage in pipe["ci_stages"]:
241                 for job in stage["jobs"]:
242                     yield job
243
244
245     def __str__(self):
246         buf = ["digraph G {"]
247
248         nodes = set()
249         edges = set()
250
251         for job in self.iter_jobs():
252             args = job["wrapper_arguments"]
253             cmd_node = CommandNode(
254                 args["pipeline_name"], args["description"], args["command"])
255             nodes.add(cmd_node)
256             if args["outputs"]:
257                 for output in args["outputs"]:
258                     out_node = DependencyNode(output)
259                     nodes.add(out_node)
260                     edges.add(Edge(src=cmd_node, dst=out_node))
261             if args["inputs"]:
262                 for inputt in args["inputs"]:
263                     in_node = DependencyNode(inputt)
264                     nodes.add(in_node)
265                     edges.add(Edge(src=in_node, dst=cmd_node))
266
267         buf.extend([("  %s" % str(n)) for n in nodes])
268         buf.extend([("  %s" % str(e)) for e in edges])
269         buf.append("}")
270         return "\n".join(buf)
271
272
273
274 async def print_graph(args):
275     lib.litani.add_jobs_to_cache()
276     run = lib.litani_report.get_run_data(lib.litani.get_cache_dir())
277
278     pc = PipelineChooser(args.pipelines)
279
280     graph = Graph(run=run, pipeline_chooser=pc)
281     print(graph)