diff --git a/xlib/console/console.py b/xlib/console/console.py index 9cf1269..9406ce5 100644 --- a/xlib/console/console.py +++ b/xlib/console/console.py @@ -2,13 +2,15 @@ from typing import Generator _progress_symbols = "|/-\\" -def progress_bar_iterator(iterable, desc = '') -> Generator: - n_count = len(iterable) - - progress_bar_print(0, n_count, desc) +def progress_bar_iterator(iterable, count : int = None, desc = '', suppress_print=False) -> Generator: + if count is None: + count = len(iterable) + if not suppress_print: + progress_bar_print(0, count, desc) for i, item in enumerate(iterable): yield item - progress_bar_print(i + 1, n_count, desc) + if not suppress_print: + progress_bar_print(i + 1, count, desc) def progress_bar_print(n, n_count, desc = ''): str_max_len = 80 @@ -19,8 +21,7 @@ def progress_bar_print(n, n_count, desc = ''): suffix_str = f'| {n}/{n_count}' bar_len = str_max_len - (prefix_str_len+len(suffix_str)) - - bar_head = '#'*int( (n/n_count)*bar_len) + bar_head = '#'*int( (n/ max(1,n_count) )*bar_len) if n != n_count: bar_head += _progress_symbols[n % len(_progress_symbols)] bar_tail = '-'*( bar_len - len(bar_head) ) diff --git a/xlib/console/diacon/Diacon.py b/xlib/console/diacon/Diacon.py new file mode 100644 index 0000000..568ebfc --- /dev/null +++ b/xlib/console/diacon/Diacon.py @@ -0,0 +1,337 @@ + +import threading +import time +from enum import IntEnum +from typing import Any, Callable, List, Tuple, Union + +from ... import text as lib_text + +class EDlgMode(IntEnum): + UNDEFINED = 0 + BACK = 1 + RELOAD = 2 + WRONG_INPUT = 3 + SUCCESS = 4 + + +class DlgChoice: + def __init__(self, name : str = None, row_desc : str = None): + super().__init__() + if len(name) == 0: + raise ValueError('Zero len name is not valid.') + self._name = name + self._row_desc = row_desc + + def get_name(self) -> Union[str, None]: return self._name + def get_row_desc(self) -> Union[str, None]: return self._row_desc + +class Dlg: + def __init__(self, title : str = None, has_go_back=True): + """ + + """ + self._title = title + self._has_go_back = has_go_back + + def get_name(self) -> str: return self._name + + def handle_user_input(self, s : str) -> EDlgMode: + """ + + """ + s = s.strip() + + # ? and < available in any dialog, handle them first + s_len = len(s) + if s_len == 0: + return EDlgMode.RELOAD + if s_len == 1: + #if s == '?': + # return EDlgMode.RELOAD + if s == '<': + return EDlgMode.BACK + + return self.on_user_input(s) + + def print(self, table_width_max=80, col_spacing = 3): + """ + print dialog + """ + + # Gather table lines + table_def : List[str]= [] + + if self._has_go_back: + table_def.append('| < | Go back.') + + table_def.append('|99') + table_def = self.on_print(table_def) + + table = lib_text.ascii_table(table_def, max_table_width=80, + left_border = None, + right_border = None, + border = ' | ', + row_symbol = None) + print() + print(table) + + + #overridable + def on_print(self, table_lines : List[Tuple[str,str]]): + return table_lines + + #overridable + def on_user_input(self, s : str) -> EDlgMode: + """ + handle user input + return False if input is invalid + """ + return EDlgMode.UNDEFINED + +class DlgChoices(Dlg): + def __init__(self, choices : List[DlgChoice], multiple_choices=False, title : str = None, has_go_back = True): + """ + + """ + super().__init__(title=title, has_go_back=has_go_back) + self._choices = choices + self._multiple_choices = multiple_choices + + self._results = None + self._results_id = None + + self._short_names = [choice.get_name() for choice in choices] + + # if any([x is not None for x in self._short_names]): + # # Using short names from choices + # if any([x is None for x in self._short_names]): + # raise Exception('No short name for one of choices.') + # if len(set(self._short_names)) != len(self._short_names): + # raise ValueError(f'Contains duplicate short names: {self._short_names}') + # else: + + # Make short names for all choices + names = [ choice.get_name() for choice in choices ] + names_len = len(names) + + if len(set(names)) != names_len: + raise ValueError(f'Contains duplicate name of choice : {names}') + + short_names_len = [1]*names_len + while True: + short_names = [ name[:short_names_len[i_name]] for i_name, name in enumerate(names) ] + + has_dup = False + for i in range(names_len): + i_short_name = short_names[i] + + match_count = 0 + for j in range(names_len): + j_short_name = short_names[j] + if i_short_name == j_short_name: + match_count += 1 + + if match_count > 1: + has_dup = True + short_names_len[i] += 1 + + if not has_dup: + break + self._short_names = short_names + + + + def get_selected_choices(self) -> List[DlgChoice]: + """ + returns selected choices + """ + return self._results + + def get_selected_choices_id(self) -> List[int]: + """ + returns selected choice + """ + return self._results_id + + #overridable + def on_print(self, table_def : List[str]): + for short_name, choice in zip(self._short_names, self._choices): + row_def = f'| {short_name}' + row_desc = choice.get_row_desc() + if row_desc is not None: + row_def += row_desc + table_def.append(row_def) + + return table_def + + #overridable + def on_user_input(self, s : str) -> bool: + result = super().on_user_input(s) + if result == EDlgMode.UNDEFINED: + + if self._multiple_choices: + multi_s = s.split(',') + else: + multi_s = [s] + + results = [] + results_id = [] + for s in multi_s: + s = s.strip() + + x = [ i for i,short_name in enumerate(self._short_names) if s == short_name ] + if len(x) == 0: + # no short name match + return EDlgMode.WRONG_INPUT + else: + id = x[0] + results_id.append(id) + results.append(self._choices[id]) + + if len(set(results_id)) != len(results_id): + # Duplicate input + return EDlgMode.WRONG_INPUT + + self._results = results + self._results_id = results_id + + return EDlgMode.SUCCESS + + return result + + + +class _Diacon: + """ + User dialog with via console. + + Internal architecture: + + [ + Main-Thread + + current thread from which __init__() called + ] + + [ + Dialog-Thread + + separate thread where dialogs are handled and dynamically created + + we need this thread, because main thread can be busy, + for example training neural network + + calls on_dlg() provided with __init__ + + thus keep in mind on_dlg() works in separate thread + + This thread must not be blocked inside on_dlg(), + because Diacon.stop() can be called that stops all threads. + ] + + [ + Input-Thread + + separate thread where user input is accepted in non-blocking mode, + and transfered to processing thread + ] + """ + + def __init__(self): + self._on_dlg : Callable = None + + self._lock = threading.RLock() + self._current_dlg : Dlg = None + self._new_dlg : Dlg = None + + self._started = False + self._dialog_t : threading.Thread = None + self._input_t : threading.Thread = None + self._input_request = False + self._input_result : str = None + + def start(self, on_dlg : Callable): + if self._started: + raise Exception('Diacon already started.') + self._started = True + self._on_dlg = on_dlg + + self._input_t = threading.Thread(target=self._input_thread, daemon=True) + self._input_t.start() + self._dialog_t = threading.Thread(target=self._dialog_thread, daemon=True) + self._dialog_t.start() + + def stop(self): + if not self._started: + raise Exception('Diacon not started.') + self._started = False + self._dialog_t.join() + self._dialog_t = None + self._input_t.join() + self._input_t = None + + def _input_thread(self,): + while self._started: + if self._input_request: + try: + input_result = input() + except Exception as e: + input_result = '' + + with self._lock: + self._input_result = input_result + self._input_request = False + time.sleep(0.050) + + + def _dialog_thread(self, ): + self._on_dlg(None, EDlgMode.RELOAD) + + while self._started: + + with self._lock: + + if self._new_dlg is not None: + (new_dlg, is_print), self._new_dlg = self._new_dlg, None + if new_dlg is not None: + self._current_dlg = new_dlg + if is_print: + self._current_dlg.print() + self._request_input() + + input_result = self._fetch_input() + if input_result is not None: + + if self._current_dlg is not None: + mode = self._current_dlg.handle_user_input(input_result) + if mode == EDlgMode.WRONG_INPUT: + print('\nWrong input') + mode = EDlgMode.RELOAD + if mode == EDlgMode.UNDEFINED: + mode = EDlgMode.RELOAD + self._on_dlg(self._current_dlg, mode) + continue + + time.sleep(0.005) + + def _fetch_input(self): + with self._lock: + result = None + if self._input_result is not None: + result, self._input_result = self._input_result, None + return result + + def _request_input(self): + with self._lock: + if not self._input_request: + self._input_result = None + self._input_request = True + + def update_dlg(self, new_dlg = None, print=True ): + """ + show current or set new Dialog + Can be called from any thread. + """ + self._new_dlg = (new_dlg, print) + +Diacon = _Diacon() diff --git a/xlib/console/diacon/__init__.py b/xlib/console/diacon/__init__.py new file mode 100644 index 0000000..34d2886 --- /dev/null +++ b/xlib/console/diacon/__init__.py @@ -0,0 +1 @@ +from .Diacon import Diacon, Dlg, DlgChoice, DlgChoices, EDlgMode diff --git a/xlib/text/__init__.py b/xlib/text/__init__.py new file mode 100644 index 0000000..2be600f --- /dev/null +++ b/xlib/text/__init__.py @@ -0,0 +1 @@ +from .ascii_table import ascii_table \ No newline at end of file diff --git a/xlib/text/ascii_table.py b/xlib/text/ascii_table.py new file mode 100644 index 0000000..af823cc --- /dev/null +++ b/xlib/text/ascii_table.py @@ -0,0 +1,294 @@ +import re +from typing import Union, List +_opts_halign = {'l':0,'c':1,'r':2} +_opts_valign = {'t':0,'m':1,'b':2} + +""" +test = [ + '|c99 TABLE NAME', + '|3 3-span left align\n multiline row |rb2 2-span right bottom align', + '|c WWWWWWWWWW |c WWWWWWWWWW |c WWWWWWWWWW |c WWWWWWWWWW |c WWWWWWWWWW', + '|c3 center aligned 3-span |r2 2-span right align', + '|r 0 |c3 Center align\nmulti\nline\nrow |l 1.00', + '|r 1 |r3 Right align\nmulti\nline\nrow |l 1.00', + '| ? | s', + '| ? | Three |c Two | asdasd | asdasd', + '| ? |3 asdasdasdasdasdasdasdasdasdasdasda |3 asdasd', + ] +""" + +class Column: + __slots__ = ['halign', 'valign', 'span', 'content'] + + def __init__(self, halign : int = 0, valign : int = 0, span : int = 1, content : str = None): + self.halign, self.valign, self.span, self.content = halign, valign, span, content + + def __str__(self): return f'{self.content} s:{self.span}' + def __repr__(self): return self.__str__() + + def split(self, sep : Union[str,int], maxsplit=-1) -> List['Column']: + result = [] + if isinstance(sep, int): + c_split = [ self.content[:sep], self.content[sep:] ] + else: + c_split = self.content.split(sep, maxsplit=maxsplit) + + if len(c_split) == 1: + return [self] + for c in c_split: + col = Column() + col.halign = self.halign + col.valign = self.valign + col.span = self.span + col.content = c + result.append(col) + return result + + def copy(self, content=...): + if content is Ellipsis: + content=self.content + + column = Column() + column.halign = self.halign + column.valign = self.valign + column.span = self.span + column.content = content + return column + +def ascii_table(table_def : List[str], + min_table_width : int = None, + max_table_width : int = None, + fixed_table_width : int = None, + style_borderless = False, + left_border : str= '|', + right_border : str = '|', + border : str= '|', + row_symbol : str = '-', + col_def_delim = '|', + ) -> str: + """ + + arguments + + table_def list of str + + |[options] data - defines new column + + options: + halign: l - left (default), c - center, r - right + valign: t - top (default), m - center, b - bottom + 1..N - col span + + example: ['|c99 TABLE NAME', + '|l first col |r second col'] + """ + if style_borderless: + left_border, right_border, border, row_symbol = None, None, ' | ', None + + if fixed_table_width is not None: + min_table_width = fixed_table_width + max_table_width = fixed_table_width + + if min_table_width is not None and max_table_width is not None: + if min_table_width > max_table_width: + raise ValueError('min_table_width > max_table_width') + + col_spacing = len(border) if border is not None else 0 + cols_count = 0 + + # Parse columns in table_def + rows : List[List[Column]] = [] + for raw_line in table_def: + # Line must starts with column definition + if len(raw_line) == 0 or raw_line[0] != col_def_delim: + raise ValueError(f'Line does not start with | symbol, content: "{raw_line}"') + + # Parsing raw columns + row : List[Column] = [] + i_raw_col = 0 + raw_line_split = raw_line.split(col_def_delim)[1:] + raw_line_split_len = len(raw_line_split) + + for n_raw_col, raw_col in enumerate(raw_line_split): + # split column options and content + col_opts, col_content = ( raw_col.split(' ', maxsplit=1) + [''] )[:2] + + # Parse column options + col = Column(content=col_content) + for col_opt in re.findall('[lcr]|[tmb]|[0-9]+', col_opts.lower()): + h = _opts_halign.get(col_opt, None) + if h is not None: + col.halign = h + continue + v = _opts_valign.get(col_opt, None) + if v is not None: + col.valign = v + continue + col.span = max(1, int(col_opt)) + row.append(col) + + if n_raw_col != raw_line_split_len-1: + i_raw_col += col.span + else: + # total max columns, by last column without span + cols_count = max(cols_count, i_raw_col+1) + + rows.append(row) + + # Cut span of last cols to fit cols_count + for row in rows: + row[-1].span = cols_count - (sum(col.span for col in row) - row[-1].span) + + # Compute cols border indexes + cols_border = [0]*cols_count + for i_col_max in range(cols_count+1): + for row in rows: + i_col = 0 + col_border = 0 + for col in row: + i_col += col.span + col_max_len = max([ len(x.strip()) for x in col.content.split('\n')]) + col_border = cols_border[i_col-1] = max(cols_border[i_col-1], col_border + col_max_len) + if i_col >= i_col_max: + break + col_border += col_spacing + + # fix zero cols border + for i_col, col_border in enumerate(cols_border): + if i_col != 0 and col_border == 0: + cols_border[i_col] = cols_border[i_col-1] + + table_width = cols_border[-1] + (len(left_border) if left_border is not None else 0) + \ + (len(right_border) if right_border is not None else 0) + + # Determine size of table width + table_width_diff = 0 + if max_table_width is not None: + table_width_diff = max(table_width_diff, table_width - max_table_width) + if min_table_width is not None: + table_width_diff = min(table_width_diff, table_width - min_table_width) + + if table_width_diff != 0: + # >0 :shrink, <0 :expand table + diffs = [ x-y for x,y in zip(cols_border, [0]+cols_border[:-1] ) ] + + while table_width_diff != 0: + if table_width_diff > 0: + max_diff = max(diffs) + if max_diff <= col_spacing: + raise Exception('Unable to shrink the table to fit max_table_width.') + + diffs[ diffs.index(max_diff) ] -= 1 + else: + diffs[ diffs.index(min(diffs)) ] += 1 + + table_width_diff += 1 if table_width_diff < 0 else -1 + + for i in range(len(cols_border)): + cols_border[i] = diffs[i] if i == 0 else cols_border[i-1] + diffs[i] + + # recompute new table_width + table_width = cols_border[-1] + (len(left_border) if left_border is not None else 0) + \ + (len(right_border) if right_border is not None else 0) + + # Process columns for \n and col width + new_rows : List[List[List[Column]]] = [] + for row in rows: + row_len = len(row) + + # Gather multi rows for every col + cols_sub_rows = [] + + i_col = 0 + col_border = 0 + for col in row: + i_col += col.span + col_border_next = cols_border[i_col-1] + + col_width = col_border_next-col_border + + # slice col to sub rows by \n separator and col_width + col_content_split = [ x.strip() for x in col.content.split('\n') ] + cols_sub_rows.append([ x[i:i+col_width].strip() for x in col_content_split + for i in range(0, len(x), col_width) ]) + + col_border = col_border_next + col_spacing + + cols_sub_rows_max = max([len(x) for x in cols_sub_rows]) + + for n, (col, col_sub_rows) in enumerate(zip(row, cols_sub_rows)): + valign = col.valign + + unfilled_rows = cols_sub_rows_max-len(col_sub_rows) + if valign == 0: # top + col_sub_rows = col_sub_rows + ['']*unfilled_rows + elif valign == 1: # center + top_pad = unfilled_rows // 2 + bottom_pad = unfilled_rows - top_pad + col_sub_rows = ['']*top_pad + col_sub_rows + ['']*bottom_pad + elif valign == 2: # bottom + col_sub_rows = ['']*unfilled_rows + col_sub_rows + + cols_sub_rows[n] = col_sub_rows + + sub_rows = [ [None]*row_len for _ in range(cols_sub_rows_max) ] + for n_col, col in enumerate(row): + for i in range(cols_sub_rows_max): + sub_rows[i][n_col] = col.copy(content=cols_sub_rows[n_col][i]) + + new_rows.append(sub_rows) + + rows = new_rows + + # Composing final lines + lines = [] + + row_line = row_symbol[0]*table_width if row_symbol is not None else None + if row_line is not None: + lines.append(row_line) + for sub_rows in rows: + + for row in sub_rows: + line = '' + + if left_border is not None: + line += left_border + + i_col = 0 + for col in row: + col_content = col.content + + if i_col == 0: + col_border0 = 0 + else: + if border is not None: + line += border + col_border0 = cols_border[i_col-1] + col_spacing + + i_col += col.span + + col_border1 = cols_border[i_col-1] + + col_space = col_border1 - col_border0 + col_remain_space = col_space-len(col_content) + + halign = col.halign + if halign == 0: # left + col_content = col_content + ' '*col_remain_space + elif halign == 1: # center + col_left_pad = col_remain_space // 2 + col_right_pad = col_remain_space - col_left_pad + col_content = ' '*col_left_pad + col_content + ' '*col_right_pad + elif halign == 2: # right + col_content = ' '*col_remain_space + col_content + + line += col_content + + if right_border is not None: + line += right_border + + lines.append(line) + if row_line is not None: + lines.append(row_line) + + return '\n'.join(lines) \ No newline at end of file