author Ted Mielczarek <>
Wed, 24 Mar 2010 11:05:53 -0400
changeset 1 c2895bc6a54b
parent 0 5c0425382afa
permissions -rwxr-xr-x
update docs slightly
#!/usr/bin/env python

The shell module provides classes that implement shell-like pipelines.

Commands are iterable, you must iterate over them to gather their output.

Commands may be strung together into pipelines using the | operator.
For example, sh("cat foo") | sh("sort") is roughly equivalent to
`cat foo | sort` in the shell. As a shortcut, bare strings and lists
following the first command are instantiated as parameters to sh(), so
sh("cat foo") | "sort" is equivalent to the preceding example. Callables
involved in a pipeline will be called and expected to produce an
instance of a command object, as a convenience allowing you to write
pipelines like:
cat('foo') | sort

The result of a pipeline is the last command in the pipeline, which can
be iterated over to gather the results of the entire pipeline.

You may subclass cmd and provide your own __iter__ method to implement
your own commands.


from itertools import izip, count
from subprocess import Popen, PIPE
from threading import Thread

__all__ = ['cmd', 'sh', 'cat', 'cut', 'sort', 'uniq']

class cmd(object):
  """Base class for all commands.

  Not directly useful, you should
  use sh() or the other commands defined here, but you may subclass
  this and provide your own __iter__ method to implement your own command.


  stdin = None
  stdout = None
  #TODO: proper stderr handling
  stderr = None

  def __or__(self, other):
    """Override the | operator to provide shell-like pipelines.

    Multiple command objects may be piped together, with the result
    being the last command in the pipeline.

    if isinstance(other, basestring) or isinstance(other, list):
      # shortcut, interpret bare strings/lists as shell commands
      other = sh(other)
    elif callable(other):
      # shortcut to use bare class names
      other = other()
    elif not isinstance(other, cmd):
      raise Exception("Can't pipe to a non-command!")
    other.stdin = self
    self.stdout = other
    return other

  def __iter__(self):
    """Subclasses should implement this to yield actual data."""
    if False:

class sh(cmd):
  """Execute a command using the system shell.

  Uses its stdin as the process' stdin, and yields
  its stdout.

  def __init__(self, command):
    self.cmd = command
    self.proc = None

  def __repr__(self):
    return "sh('%s')" % self.cmd

  def _ensureRunning(self):
    if self.proc is None:
      shell = False
      if isinstance(self.cmd, basestring):
        shell = True
      stdin = None
      if self.stdin:
        if isinstance(self.stdin, sh):
          stdin = self.stdin.proc.stdout
          stdin = PIPE
      #TODO: proper stderr handling
      self.proc = Popen(self.cmd, stdin=stdin, stdout=PIPE, stderr=self.stderr,

  def _pumpStdin(self):
    if self.stdin and self.proc and self.proc.stdin:
      # We'll spawn a thread here to pump our stdin for data.
      # Is this safe? Probably not, but I don't think there's
      # an event-based way to deal with I/O in the subprocess module.
      def run():
          for line in self.stdin:

  def __iter__(self):
    if self.proc:
      for line in self.proc.stdout:
        yield line

class sort(cmd):
  """An incomplete implementation of sort(1).

  Yields the contents of its stdin sorted in alphanumeric order.

  def __iter__(self):
    if self.stdin is not None:
      for x in sorted(self.stdin):
        yield x

class uniq(cmd):
  """An incomplete implementation of uniq(1).

  Yields its stdin with adjacent matching lines suppressed.

  def __iter__(self):
    if self.stdin is not None:
      prev = None
      for x in self.stdin:
        if x != prev:
          prev = x
          yield x

class cut(cmd):
  """An incomplete implementation of cut(1).

  For example, `| cut -f1 -d,` becomes:
  | cut(f=1, d=',')
  and `| cut -f1,2 -d,` becomes:
  | cut(f=(1,2, d=',')

  def __init__(self, d='\t', f=None):
    if f is None:
      raise Exception("You must specify a delimiter and a field")
    self.delim = d
    if isinstance(f, tuple):
      self.fields = f
      self.fields = (int(f),)

  def __iter__(self):
    if self.stdin:
      for line in self.stdin:
        bits = line.rstrip('\r\n').split(self.delim)
        yield self.delim.join([x for (i,x) in izip(count(1), bits) if i in self.fields]) + "\n"

class cat(cmd):
  """An incomplete implementation of cat(1).

  Yields the contents of files or iterables provided as arguments,
  or the contents of its stdin if no arguments are provided.

  def __init__(self, *args):
    if len(args) == 0:
      # default to stdin
      self.args = ['-']
      self.args = args

  def __iter__(self):
    for arg in self.args:
      if arg == '-':
        for line in self.stdin:
          yield line
      elif isinstance(arg, basestring):
        # assume it's a filename
        with open(arg, 'r') as f:
          for line in f:
            yield line
        # assume it's iterable
        for x in arg:
          yield "%s\n" % x

if __name__ == '__main__':
  import sys
  # sample usage. yes, we could do this all in-process with cat() and sort(),
  # but this shows that we can pipe to and from shell commands
  for line in sh("cat /tmp/junk2") | cut(d=',',f=1) | 'sort' | uniq: