Source code for pimlico.cli.shell.base

import traceback
from operator import itemgetter

import os
from cmd import Cmd

import readline

from pimlico import LOG_DIR
from pimlico.utils.core import is_identifier

HISTORY_FILE = os.path.join(LOG_DIR, "data_shell_history")
# Limit stored history
readline.set_history_length(500)


[docs]class ShellCommand(object): """ Base class used to provide commands for exploring a particular datatype. A basic set of commands is provided for all datatypes, but specific datatype classes may provide their own, by overriding the `shell_commands` attribute. """ # List of command strings that trigger this command commands = [] help_text = None
[docs] def execute(self, shell, *args, **kwargs): raise NotImplementedError
[docs]class DataShell(Cmd): """ Terminal shell for querying datatypes. """ prompt = ">>> " def __init__(self, data, commands, *args, **kwargs): Cmd.__init__(self, *args, **kwargs) self.data = data self.result_limit = 10 readline.set_completer_delims(" ") # Index commands by their command strings for easy lookup self.commands = {} for command in commands: for command_str in command.commands: self.commands[command_str] = command # Add help method for each command for command_name in self.commands: if is_identifier(command_name): def _help_cmd(): print self.commands[command_name].help_text setattr(self, "help_%s" % command_name, _help_cmd) # Environment for executing Python commands # May get updated as time goes on self.env = { "data": self.data, }
[docs] def do_EOF(self, line): """ Exits the shell """ print "\nExiting" return True
[docs] def preloop(self): # Load shell history if os.path.exists(HISTORY_FILE): readline.read_history_file(HISTORY_FILE)
[docs] def postloop(self): # Save shell history readline.write_history_file(HISTORY_FILE)
[docs] def emptyline(self): """ Don't repeat the last command (default): ignore empty lines """ return
[docs] def default(self, line): """ We use this to handle most commands, including the default fallback, which is to execute Python """ parts = line.split() if len(parts): if parts[0] in self.commands: cmd = self.commands[parts[0]] # Process the rest of the line to get args and kwargs kwargs = dict(itemgetter(0, 2)(arg.partition("=")) for arg in parts[1:] if "=" in arg) args = [arg for arg in parts[1:] if "=" not in arg] cmd.execute(self, *args, **kwargs) else: # If this isn't recognised as a command, try executing with Python interpreter exec line in self.env
[docs] def cmdloop(self, intro=None): if intro or self.intro: print intro or self.intro while True: try: Cmd.cmdloop(self, intro="") except ShellError, e: print e except KeyboardInterrupt: print self.postloop() except: # Print out the stack trace and return to the shell print "Error running command:" traceback.print_exc() self.postloop() else: self.postloop() break
[docs]class ShellError(Exception): pass