Source code for pimlico.cli.shell.base

# This file is part of Pimlico
# Copyright (C) 2020 Mark Granroth-Wilding
# Licensed under the GNU LGPL v3.0 - https://www.gnu.org/licenses/lgpl-3.0.en.html

from __future__ import print_function
from builtins import object

import os
import readline
import traceback
from cmd import Cmd
from operator import itemgetter

from pimlico import LOG_DIR
from pimlico.utils.core import is_identifier

HISTORY_FILE = os.path.join(LOG_DIR, "data_shell_history")
# Limit stored history
readline.set_history_length(500)


[docs]class ShellCommand(object): """ Base class used to provide commands for exploring a particular datatype. A basic set of commands is provided for all datatypes, but specific datatype classes may provide their own, by overriding the `shell_commands` attribute. """ # List of command strings that trigger this command commands = [] help_text = None
[docs] def execute(self, shell, *args, **kwargs): """ Execute the command. Get the dataset reader as shell.data. :param shell: DataShell instance. Reader available as shell.data :param args: Args given by the user :param kwargs: Named args given by the user as key=val """ raise NotImplementedError
[docs]class DataShell(Cmd): """ Terminal shell for querying datatypes. """ prompt = ">>> " def __init__(self, data, commands, *args, **kwargs): Cmd.__init__(self, *args, **kwargs) self.data = data self.result_limit = 10 readline.set_completer_delims(" ") # Index commands by their command strings for easy lookup self.commands = {} for command_obj in commands: for command_str in command_obj.commands: self.commands[command_str] = command_obj # Add help method for each command for command_name in self.commands: # We can only do this with command names that are valid Python identifiers if is_identifier(command_name): # Use closures to bind the command name def _get_help_cmd(slf, name): def _help_cmd(): print(slf.commands[name].help_text) return _help_cmd setattr(self, "help_%s" % command_name, _get_help_cmd(self, command_name)) def _get_do_cmd(slf, name): def _do_cmd(line): slf._run_command(name, line.split()) return _do_cmd setattr(self, "do_%s" % command_name, _get_do_cmd(self, command_name)) # Environment for executing Python commands # May get updated as time goes on self.env = { "data": self.data, "reader": self.data, }
[docs] def get_names(self): # Overridden so it allows our help methods to be added dynamically return dir(self)
[docs] def do_EOF(self, line): """ Exits the shell """ print("\nExiting") return True
[docs] def preloop(self): # Load shell history if os.path.exists(HISTORY_FILE): readline.read_history_file(HISTORY_FILE)
[docs] def postloop(self): # Save shell history if not os.path.exists(os.path.dirname(HISTORY_FILE)): os.makedirs(os.path.dirname(HISTORY_FILE)) readline.write_history_file(HISTORY_FILE)
[docs] def emptyline(self): """ Don't repeat the last command (default): ignore empty lines """ return
def _run_command(self, cmd_name, parts): cmd = self.commands[cmd_name] # Process the rest of the line to get args and kwargs kwargs = dict(itemgetter(0, 2)(arg.partition("=")) for arg in parts if "=" in arg) args = [arg for arg in parts if "=" not in arg] cmd.execute(self, *args, **kwargs)
[docs] def default(self, line): """ We use this to handle commands that can't be handled using the `do_` pattern. Also handles the default fallback, which is to execute Python. """ parts = line.split() if len(parts): if parts[0] in self.commands: self._run_command(parts[0], parts[1:]) else: # If this isn't recognised as a command, try executing with Python interpreter exec(line, self.env)
[docs] def cmdloop(self, intro=None): if intro or self.intro: print(intro or self.intro) while True: try: Cmd.cmdloop(self, intro="") except ShellError as e: print(e) except KeyboardInterrupt: print() self.postloop() except: # Print out the stack trace and return to the shell print("Error running command:") traceback.print_exc() self.postloop() else: self.postloop() break
[docs]class ShellError(Exception): pass