Skip to content

KDA

Contains the Kemeny Decomposition Algorithm (KDA) class.

KDA

Kemeny Decomposition Algorithm (KDA) class.

KDA will iteratively cut 'edges' to decompose an original Markov chain (MC). An edge corresponds to a MC transition probability. The notation of Berkhout and Heidergott (2019) is used.

Attributes:

  • MC (MarkovChain) –

    Current Markov chain during KDA.

  • log (dict) –

    Dictionary which logs the edges cut by KDA during the iterations. It also logs the Markov chains after each iteration. Each iteration of the inner-loop is stored by appending the list log['edges cut']. It is initialized with [None] to make the indexing easier. The Markov chains are stored in log['Markov chains'], where the original/initial MC is stored at index 0.

Methods:

  • run

    This will run KDA.

  • cut_edges

    Allows one to cut manual edges in the current Markov chain to create a new Markov chain after normalization

Source code in pykda\KDA.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
class KDA:
    """
    Kemeny Decomposition Algorithm (KDA) class.

    KDA will iteratively cut 'edges' to decompose an original Markov chain (MC).
    An edge corresponds to a MC transition probability. The notation of Berkhout
    and Heidergott (2019) is used.

    Attributes
    ----------
    MC : MarkovChain
        Current Markov chain during KDA.
    log : dict
        Dictionary which logs the edges cut by KDA during the iterations. It
        also logs the Markov chains after each iteration. Each iteration of
        the inner-loop is stored by appending the list log['edges cut']. It is
        initialized with [None] to make the indexing easier. The Markov chains
        are stored in log['Markov chains'], where the original/initial MC is
        stored at index 0.

    Methods
    -------
    run()
        This will run KDA.
    cut_edges(*args)
        Allows one to cut manual edges in the current Markov chain to create
        a new Markov chain after normalization

    """

    def __init__(
        self,
        original_MC: MarkovChain,
        CO_A: str = "CO_A_1(1)",
        CO_B: str = "CO_B_3(0)",
        symmetric_cut: bool = False,
        verbose: bool = False,
        normalizer: _normalizer_type = standard_row_normalization,
    ):
        """
        Parameters
        ----------
        original_MC : MarkovChain
            Original Markov chain object to which KDA will be applied.
        CO_A : str
            Condition of how often the Kemeny constant derivatives are being
            recalculated (outer while loop in KDA). The options are:

                - 'CO_A_1(i)' = Number of times performed < i
                - 'CO_A_2(E)' = Number of ergodic classes in current MC is < E
                - 'CO_A_3(C)' = Number of strongly connected components in
                                current MC is < C
        CO_B : str
            Condition of how many edges are being cut per iteration (inner while
            loop in KDA). The options are:

                - 'CO_B_1(e)' = Number of edges cut is < e
                - 'CO_B_2(E)' = Number of ergodic classes in MC is < E
                - 'CO_B_3(q)' = Not all edges with MC.KDer < q are cut
        symmetric_cut : bool
            If True, cutting (i, j) will also cut (j, i). If False, only (i, j).
        verbose : bool
            If true, information will be printed, else not.
        normalizer : normalizer_type
            Normalizer used to create a stochastic matrix from a matrix.
        """

        self.MC = original_MC.copy()
        self.log = {  # also log original MC after "iteration 0"
            "edges cut": [[None]],
            "Markov chains": [self.MC.copy()],
        }
        self.CO_A = CO_A
        self.CO_A_type = CO_A[:6]
        self.num_in_CO_A = self.get_num_in_str_brackets(CO_A)
        self.CO_B = CO_B
        self.CO_B_type = CO_B[:6]
        self.num_in_CO_B = self.get_num_in_str_brackets(CO_B)
        self.symmetric_cut = symmetric_cut
        self.normalizer = normalizer
        self.verbose = verbose

    @staticmethod
    def get_num_in_str_brackets(s):
        """Returns the number given between brackets in string s."""

        return int(s[s.find("(") + 1 : s.find(")")])

    def condition_A(self) -> bool:
        """Returns whether condition A is True or False."""

        if self.CO_A_type == "CO_A_1":
            return self.iterations_count < self.num_in_CO_A

        if self.CO_A_type == "CO_A_2":
            return self.MC.num_ergodic_classes < self.num_in_CO_A

        if self.CO_A_type == "CO_A_3":
            return self.MC.num_strongly_connected_components < self.num_in_CO_A

        raise Exception("Unknown condition A chosen (CO_A).")

    def condition_B(self) -> bool:
        """Returns whether condition B is True or False."""

        if self.CO_B_type == "CO_B_2":
            return self.MC.num_ergodic_classes < self.num_in_CO_B

        raise Exception("Unknown condition B chosen (CO_B).")

    def run(self) -> None:
        """Runs KDA with conditions CO_A and CO_B."""

        self.iterations_count = 0  # of the outer while loop of KDA

        # start cutting till condition A fails
        while self.condition_A():

            self.cut_till_condition_B_fails()
            self.iterations_count += 1

    def cut_till_condition_B_fails(self):
        """Cuts edges till condition B fails."""

        if self.CO_B_type == "CO_B_1":

            edges = self.MC.most_connecting_edges(self.num_in_CO_B)
            self.cut_edges(*edges)

        elif self.CO_B_type == "CO_B_2":

            row_indexes, col_indexes = self.MC.sorted_edges()
            num_cut = 0

            while self.condition_B():  # inner while loop
                self.cut_edges(
                    row_indexes[num_cut : num_cut + 1],
                    col_indexes[num_cut : num_cut + 1],
                )
                num_cut += 1

        elif self.CO_B_type == "CO_B_3":

            edges = self.MC.edges_below_threshold(self.num_in_CO_B)
            self.cut_edges(*edges)

        else:
            raise Exception("Unknown condition B chosen (CO_B).")

    def cut_edges(self, *args):
        """Cut given edges in the Markov chain and normalize afterward.

        There are different options to specify which edges to cut via args.
        When self.symmetric_cut = True, also the reversed edges are cut.

        Parameters
        ----------
        args :
            There are three options for args:
                1. One tuple of length 2 indicating which edge to cut.
                2. One list of tuples of edges which to cut.
                3. Two lists or np.ndarrays indicating which edges to cut.

        """

        if len(args) == 1:
            if isinstance(args[0], tuple):
                self._cut_edges([args[0][0]], [args[0][1]])
            elif isinstance(args[0], list):
                assert all(isinstance(x, tuple) for x in args[0])
                row_indexes, col_indexes = map(list, zip(*args[0]))
                self._cut_edges(row_indexes, col_indexes)
            else:
                raise Exception(
                    "Expected list of tuples or tuple in case "
                    "one argument is given in KDA.cut_edges()."
                )
        elif len(args) == 2:
            self._cut_edges(args[0], args[1])
        else:
            raise Exception("Expected 1 or 2 arguments in KDA.cut_edges().")

    def _cut_edges(
        self, row_indexes: np.ndarray | list, col_indexes: np.ndarray | list
    ) -> None:
        """Cut given edges in the Markov chain and normalize afterward.

        When self.symmetric_cut = True, also the reversed edges are cut.

        Parameters
        ----------
        row_indexes : np.ndarray | list
            Row indexes of the edges to be cut.
        col_indexes : np.ndarray | list
            Columns indexes of the edges to be cut.

        """

        if self.symmetric_cut:
            row_indexes, col_indexes = np.concatenate(
                (row_indexes, col_indexes)
            ), np.concatenate((col_indexes, row_indexes))

        self.MC.P[row_indexes, col_indexes] = 0  # cut edges
        new_P = load_transition_matrix(self.MC.P, self.normalizer)
        self.MC = MarkovChain(new_P)
        self.log_edges(row_indexes, col_indexes)
        self.check_for_infinite_loops()

    def log_edges(
        self, row_indexes: np.ndarray | list, col_indexes: np.ndarray | list
    ) -> None:
        """Logs the edges that have been cut.

        Parameters
        ----------
        row_indexes : np.ndarray | list
            Row indexes of the edges that are cut.
        col_indexes : np.ndarray | list
            Columns indexes of the edges that are cut.
        """

        edges = [(x, y) for x, y in zip(row_indexes, col_indexes)]
        self.log["edges cut"].append(edges)
        self.log["Markov chains"].append(self.MC.copy())

    def report(self):  # pragma: no cover
        """Prints a report of KDA."""

        # init
        edges_cut = self.log["edges cut"]
        MCs = self.log["Markov chains"]

        print("\nKDA report:")
        print("===========\n")

        print("KDA settings:")
        print(f"- Condition A: {self.CO_A} ({self.condition_A_translator()})")
        print(f"- Condition B: {self.CO_B} ({self.condition_B_translator()})")
        print(f"- Symmetric cut: {self.symmetric_cut}\n")

        print("Initial Markov chain (MC) is:")
        print(f"{MCs[0]}\n")

        print("KDA progress is:")
        for iteration_num, (edges_cut, MC) in enumerate(zip(edges_cut, MCs)):
            print(f"* Iteration {iteration_num}:")
            print(f"   - Edges cut: {edges_cut}")
            print(f"   - Ergodic classes: {MC.ergodic_classes}")
            print(f"   - Transient classes: {MC.transient_classes}")
            scc = MC.strongly_connected_components
            print(f"   - Strongly connected components: {scc}")
            wcc = MC.weakly_connected_components
            print(f"   - Weakly connected components: {wcc}")

    def check_for_infinite_loops(self):
        """Due to fixes for normalization, it may happen that the same edge is
        cut over and over again. This check raises an error in that case.
        """

        if len(self.log["edges cut"]) > 2:
            edges_cut_now = set(self.log["edges cut"][-1])
            edges_cut_prev = set(self.log["edges cut"][-2])
            edges_cut_prev2 = set(self.log["edges cut"][-3])
            same_edges_cut = edges_cut_now & edges_cut_prev & edges_cut_prev2
            if len(same_edges_cut) > 0:
                raise Warning(
                    "Possible infinite KDA loop detected: the following edges "
                    f"may be cut over and over again: {same_edges_cut}. "
                    f"If you see this message more than two times, try using"
                    f" another KDA setting that cuts less edges to avoid this."
                )

    @staticmethod
    def get_num_in_brackets(s):
        """Gets the number given between brackets in string s."""

        return int(s[s.find("(") + 1 : s.find(")")])

    def condition_A_translator(self):  # pragma: no cover
        """Translates the condition A into something readable."""

        if self.CO_A_type == "CO_A_1":
            return f"continue till {self.num_in_CO_A} times performed"

        if self.CO_A_type == "CO_A_2":
            return f"continue till # ergodic classes is {self.num_in_CO_A}"

        if self.CO_A_type == "CO_A_3":
            return (
                f"continue till # strongly connected components "
                f"{self.num_in_CO_A}"
            )

        raise Exception("Unknown condition A chosen (CO_A).")

    def condition_B_translator(self):  # pragma: no cover
        """Translates the condition B into something readable."""

        if self.CO_B_type == "CO_B_1":
            return f"continue till {self.num_in_CO_B} edge(s) cut"

        if self.CO_B_type == "CO_B_2":
            return f"continue till # ergodic classes is {self.num_in_CO_B}"

        if self.CO_B_type == "CO_B_3":
            return (
                f"continue till all edges with Kemeny constant derivative"
                f" < {self.num_in_CO_B} are cut"
            )

        raise Exception("Unknown condition B chosen (CO_B).")

    def plot_progress(self, **kwargs):  # pragma: no cover
        """Plots the Markov chains in the log. The kwargs are passed to the
        plot method of the Markov chain. Refer to MarkovChain.plot() for more
        details."""

        for i, MC in enumerate(self.log["Markov chains"]):
            MC.plot(**kwargs)

    def plot(self, **kwargs):  # pragma: no cover
        """Plots the Markov chain after KDA. Refer to MarkovChain.plot() for
        more details."""

        self.MC.plot(**kwargs)

__init__(original_MC, CO_A='CO_A_1(1)', CO_B='CO_B_3(0)', symmetric_cut=False, verbose=False, normalizer=standard_row_normalization)

Parameters:

  • original_MC (MarkovChain) –

    Original Markov chain object to which KDA will be applied.

  • CO_A (str, default: 'CO_A_1(1)' ) –

    Condition of how often the Kemeny constant derivatives are being recalculated (outer while loop in KDA). The options are:

    - 'CO_A_1(i)' = Number of times performed < i
    - 'CO_A_2(E)' = Number of ergodic classes in current MC is < E
    - 'CO_A_3(C)' = Number of strongly connected components in
                    current MC is < C
    
  • CO_B (str, default: 'CO_B_3(0)' ) –

    Condition of how many edges are being cut per iteration (inner while loop in KDA). The options are:

    - 'CO_B_1(e)' = Number of edges cut is < e
    - 'CO_B_2(E)' = Number of ergodic classes in MC is < E
    - 'CO_B_3(q)' = Not all edges with MC.KDer < q are cut
    
  • symmetric_cut (bool, default: False ) –

    If True, cutting (i, j) will also cut (j, i). If False, only (i, j).

  • verbose (bool, default: False ) –

    If true, information will be printed, else not.

  • normalizer (normalizer_type, default: standard_row_normalization ) –

    Normalizer used to create a stochastic matrix from a matrix.

Source code in pykda\KDA.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(
    self,
    original_MC: MarkovChain,
    CO_A: str = "CO_A_1(1)",
    CO_B: str = "CO_B_3(0)",
    symmetric_cut: bool = False,
    verbose: bool = False,
    normalizer: _normalizer_type = standard_row_normalization,
):
    """
    Parameters
    ----------
    original_MC : MarkovChain
        Original Markov chain object to which KDA will be applied.
    CO_A : str
        Condition of how often the Kemeny constant derivatives are being
        recalculated (outer while loop in KDA). The options are:

            - 'CO_A_1(i)' = Number of times performed < i
            - 'CO_A_2(E)' = Number of ergodic classes in current MC is < E
            - 'CO_A_3(C)' = Number of strongly connected components in
                            current MC is < C
    CO_B : str
        Condition of how many edges are being cut per iteration (inner while
        loop in KDA). The options are:

            - 'CO_B_1(e)' = Number of edges cut is < e
            - 'CO_B_2(E)' = Number of ergodic classes in MC is < E
            - 'CO_B_3(q)' = Not all edges with MC.KDer < q are cut
    symmetric_cut : bool
        If True, cutting (i, j) will also cut (j, i). If False, only (i, j).
    verbose : bool
        If true, information will be printed, else not.
    normalizer : normalizer_type
        Normalizer used to create a stochastic matrix from a matrix.
    """

    self.MC = original_MC.copy()
    self.log = {  # also log original MC after "iteration 0"
        "edges cut": [[None]],
        "Markov chains": [self.MC.copy()],
    }
    self.CO_A = CO_A
    self.CO_A_type = CO_A[:6]
    self.num_in_CO_A = self.get_num_in_str_brackets(CO_A)
    self.CO_B = CO_B
    self.CO_B_type = CO_B[:6]
    self.num_in_CO_B = self.get_num_in_str_brackets(CO_B)
    self.symmetric_cut = symmetric_cut
    self.normalizer = normalizer
    self.verbose = verbose

check_for_infinite_loops()

Due to fixes for normalization, it may happen that the same edge is cut over and over again. This check raises an error in that case.

Source code in pykda\KDA.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def check_for_infinite_loops(self):
    """Due to fixes for normalization, it may happen that the same edge is
    cut over and over again. This check raises an error in that case.
    """

    if len(self.log["edges cut"]) > 2:
        edges_cut_now = set(self.log["edges cut"][-1])
        edges_cut_prev = set(self.log["edges cut"][-2])
        edges_cut_prev2 = set(self.log["edges cut"][-3])
        same_edges_cut = edges_cut_now & edges_cut_prev & edges_cut_prev2
        if len(same_edges_cut) > 0:
            raise Warning(
                "Possible infinite KDA loop detected: the following edges "
                f"may be cut over and over again: {same_edges_cut}. "
                f"If you see this message more than two times, try using"
                f" another KDA setting that cuts less edges to avoid this."
            )

condition_A()

Returns whether condition A is True or False.

Source code in pykda\KDA.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
def condition_A(self) -> bool:
    """Returns whether condition A is True or False."""

    if self.CO_A_type == "CO_A_1":
        return self.iterations_count < self.num_in_CO_A

    if self.CO_A_type == "CO_A_2":
        return self.MC.num_ergodic_classes < self.num_in_CO_A

    if self.CO_A_type == "CO_A_3":
        return self.MC.num_strongly_connected_components < self.num_in_CO_A

    raise Exception("Unknown condition A chosen (CO_A).")

condition_A_translator()

Translates the condition A into something readable.

Source code in pykda\KDA.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def condition_A_translator(self):  # pragma: no cover
    """Translates the condition A into something readable."""

    if self.CO_A_type == "CO_A_1":
        return f"continue till {self.num_in_CO_A} times performed"

    if self.CO_A_type == "CO_A_2":
        return f"continue till # ergodic classes is {self.num_in_CO_A}"

    if self.CO_A_type == "CO_A_3":
        return (
            f"continue till # strongly connected components "
            f"{self.num_in_CO_A}"
        )

    raise Exception("Unknown condition A chosen (CO_A).")

condition_B()

Returns whether condition B is True or False.

Source code in pykda\KDA.py
113
114
115
116
117
118
119
def condition_B(self) -> bool:
    """Returns whether condition B is True or False."""

    if self.CO_B_type == "CO_B_2":
        return self.MC.num_ergodic_classes < self.num_in_CO_B

    raise Exception("Unknown condition B chosen (CO_B).")

condition_B_translator()

Translates the condition B into something readable.

Source code in pykda\KDA.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def condition_B_translator(self):  # pragma: no cover
    """Translates the condition B into something readable."""

    if self.CO_B_type == "CO_B_1":
        return f"continue till {self.num_in_CO_B} edge(s) cut"

    if self.CO_B_type == "CO_B_2":
        return f"continue till # ergodic classes is {self.num_in_CO_B}"

    if self.CO_B_type == "CO_B_3":
        return (
            f"continue till all edges with Kemeny constant derivative"
            f" < {self.num_in_CO_B} are cut"
        )

    raise Exception("Unknown condition B chosen (CO_B).")

cut_edges(*args)

Cut given edges in the Markov chain and normalize afterward.

There are different options to specify which edges to cut via args. When self.symmetric_cut = True, also the reversed edges are cut.

Parameters:

  • args

    There are three options for args: 1. One tuple of length 2 indicating which edge to cut. 2. One list of tuples of edges which to cut. 3. Two lists or np.ndarrays indicating which edges to cut.

Source code in pykda\KDA.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def cut_edges(self, *args):
    """Cut given edges in the Markov chain and normalize afterward.

    There are different options to specify which edges to cut via args.
    When self.symmetric_cut = True, also the reversed edges are cut.

    Parameters
    ----------
    args :
        There are three options for args:
            1. One tuple of length 2 indicating which edge to cut.
            2. One list of tuples of edges which to cut.
            3. Two lists or np.ndarrays indicating which edges to cut.

    """

    if len(args) == 1:
        if isinstance(args[0], tuple):
            self._cut_edges([args[0][0]], [args[0][1]])
        elif isinstance(args[0], list):
            assert all(isinstance(x, tuple) for x in args[0])
            row_indexes, col_indexes = map(list, zip(*args[0]))
            self._cut_edges(row_indexes, col_indexes)
        else:
            raise Exception(
                "Expected list of tuples or tuple in case "
                "one argument is given in KDA.cut_edges()."
            )
    elif len(args) == 2:
        self._cut_edges(args[0], args[1])
    else:
        raise Exception("Expected 1 or 2 arguments in KDA.cut_edges().")

cut_till_condition_B_fails()

Cuts edges till condition B fails.

Source code in pykda\KDA.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def cut_till_condition_B_fails(self):
    """Cuts edges till condition B fails."""

    if self.CO_B_type == "CO_B_1":

        edges = self.MC.most_connecting_edges(self.num_in_CO_B)
        self.cut_edges(*edges)

    elif self.CO_B_type == "CO_B_2":

        row_indexes, col_indexes = self.MC.sorted_edges()
        num_cut = 0

        while self.condition_B():  # inner while loop
            self.cut_edges(
                row_indexes[num_cut : num_cut + 1],
                col_indexes[num_cut : num_cut + 1],
            )
            num_cut += 1

    elif self.CO_B_type == "CO_B_3":

        edges = self.MC.edges_below_threshold(self.num_in_CO_B)
        self.cut_edges(*edges)

    else:
        raise Exception("Unknown condition B chosen (CO_B).")

get_num_in_brackets(s) staticmethod

Gets the number given between brackets in string s.

Source code in pykda\KDA.py
284
285
286
287
288
@staticmethod
def get_num_in_brackets(s):
    """Gets the number given between brackets in string s."""

    return int(s[s.find("(") + 1 : s.find(")")])

get_num_in_str_brackets(s) staticmethod

Returns the number given between brackets in string s.

Source code in pykda\KDA.py
93
94
95
96
97
@staticmethod
def get_num_in_str_brackets(s):
    """Returns the number given between brackets in string s."""

    return int(s[s.find("(") + 1 : s.find(")")])

log_edges(row_indexes, col_indexes)

Logs the edges that have been cut.

Parameters:

  • row_indexes (ndarray | list) –

    Row indexes of the edges that are cut.

  • col_indexes (ndarray | list) –

    Columns indexes of the edges that are cut.

Source code in pykda\KDA.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def log_edges(
    self, row_indexes: np.ndarray | list, col_indexes: np.ndarray | list
) -> None:
    """Logs the edges that have been cut.

    Parameters
    ----------
    row_indexes : np.ndarray | list
        Row indexes of the edges that are cut.
    col_indexes : np.ndarray | list
        Columns indexes of the edges that are cut.
    """

    edges = [(x, y) for x, y in zip(row_indexes, col_indexes)]
    self.log["edges cut"].append(edges)
    self.log["Markov chains"].append(self.MC.copy())

plot(**kwargs)

Plots the Markov chain after KDA. Refer to MarkovChain.plot() for more details.

Source code in pykda\KDA.py
332
333
334
335
336
def plot(self, **kwargs):  # pragma: no cover
    """Plots the Markov chain after KDA. Refer to MarkovChain.plot() for
    more details."""

    self.MC.plot(**kwargs)

plot_progress(**kwargs)

Plots the Markov chains in the log. The kwargs are passed to the plot method of the Markov chain. Refer to MarkovChain.plot() for more details.

Source code in pykda\KDA.py
324
325
326
327
328
329
330
def plot_progress(self, **kwargs):  # pragma: no cover
    """Plots the Markov chains in the log. The kwargs are passed to the
    plot method of the Markov chain. Refer to MarkovChain.plot() for more
    details."""

    for i, MC in enumerate(self.log["Markov chains"]):
        MC.plot(**kwargs)

report()

Prints a report of KDA.

Source code in pykda\KDA.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def report(self):  # pragma: no cover
    """Prints a report of KDA."""

    # init
    edges_cut = self.log["edges cut"]
    MCs = self.log["Markov chains"]

    print("\nKDA report:")
    print("===========\n")

    print("KDA settings:")
    print(f"- Condition A: {self.CO_A} ({self.condition_A_translator()})")
    print(f"- Condition B: {self.CO_B} ({self.condition_B_translator()})")
    print(f"- Symmetric cut: {self.symmetric_cut}\n")

    print("Initial Markov chain (MC) is:")
    print(f"{MCs[0]}\n")

    print("KDA progress is:")
    for iteration_num, (edges_cut, MC) in enumerate(zip(edges_cut, MCs)):
        print(f"* Iteration {iteration_num}:")
        print(f"   - Edges cut: {edges_cut}")
        print(f"   - Ergodic classes: {MC.ergodic_classes}")
        print(f"   - Transient classes: {MC.transient_classes}")
        scc = MC.strongly_connected_components
        print(f"   - Strongly connected components: {scc}")
        wcc = MC.weakly_connected_components
        print(f"   - Weakly connected components: {wcc}")

run()

Runs KDA with conditions CO_A and CO_B.

Source code in pykda\KDA.py
121
122
123
124
125
126
127
128
129
130
def run(self) -> None:
    """Runs KDA with conditions CO_A and CO_B."""

    self.iterations_count = 0  # of the outer while loop of KDA

    # start cutting till condition A fails
    while self.condition_A():

        self.cut_till_condition_B_fails()
        self.iterations_count += 1