"""RCS interface module. | |
Defines the class RCS, which represents a directory with rcs version | |
files and (possibly) corresponding work files. | |
""" | |
import fnmatch | |
import os | |
import re | |
import string | |
import tempfile | |
class RCS: | |
"""RCS interface class (local filesystem version). | |
An instance of this class represents a directory with rcs version | |
files and (possible) corresponding work files. | |
Methods provide access to most rcs operations such as | |
checkin/checkout, access to the rcs metadata (revisions, logs, | |
branches etc.) as well as some filesystem operations such as | |
listing all rcs version files. | |
XXX BUGS / PROBLEMS | |
- The instance always represents the current directory so it's not | |
very useful to have more than one instance around simultaneously | |
""" | |
# Characters allowed in work file names | |
okchars = string.ascii_letters + string.digits + '-_=+' | |
def __init__(self): | |
"""Constructor.""" | |
pass | |
def __del__(self): | |
"""Destructor.""" | |
pass | |
# --- Informational methods about a single file/revision --- | |
def log(self, name_rev, otherflags = ''): | |
"""Return the full log text for NAME_REV as a string. | |
Optional OTHERFLAGS are passed to rlog. | |
""" | |
f = self._open(name_rev, 'rlog ' + otherflags) | |
data = f.read() | |
status = self._closepipe(f) | |
if status: | |
data = data + "%s: %s" % status | |
elif data[-1] == '\n': | |
data = data[:-1] | |
return data | |
def head(self, name_rev): | |
"""Return the head revision for NAME_REV""" | |
dict = self.info(name_rev) | |
return dict['head'] | |
def info(self, name_rev): | |
"""Return a dictionary of info (from rlog -h) for NAME_REV | |
The dictionary's keys are the keywords that rlog prints | |
(e.g. 'head' and its values are the corresponding data | |
(e.g. '1.3'). | |
XXX symbolic names and locks are not returned | |
""" | |
f = self._open(name_rev, 'rlog -h') | |
dict = {} | |
while 1: | |
line = f.readline() | |
if not line: break | |
if line[0] == '\t': | |
# XXX could be a lock or symbolic name | |
# Anything else? | |
continue | |
i = string.find(line, ':') | |
if i > 0: | |
key, value = line[:i], string.strip(line[i+1:]) | |
dict[key] = value | |
status = self._closepipe(f) | |
if status: | |
raise IOError, status | |
return dict | |
# --- Methods that change files --- | |
def lock(self, name_rev): | |
"""Set an rcs lock on NAME_REV.""" | |
name, rev = self.checkfile(name_rev) | |
cmd = "rcs -l%s %s" % (rev, name) | |
return self._system(cmd) | |
def unlock(self, name_rev): | |
"""Clear an rcs lock on NAME_REV.""" | |
name, rev = self.checkfile(name_rev) | |
cmd = "rcs -u%s %s" % (rev, name) | |
return self._system(cmd) | |
def checkout(self, name_rev, withlock=0, otherflags=""): | |
"""Check out NAME_REV to its work file. | |
If optional WITHLOCK is set, check out locked, else unlocked. | |
The optional OTHERFLAGS is passed to co without | |
interpretation. | |
Any output from co goes to directly to stdout. | |
""" | |
name, rev = self.checkfile(name_rev) | |
if withlock: lockflag = "-l" | |
else: lockflag = "-u" | |
cmd = 'co %s%s %s %s' % (lockflag, rev, otherflags, name) | |
return self._system(cmd) | |
def checkin(self, name_rev, message=None, otherflags=""): | |
"""Check in NAME_REV from its work file. | |
The optional MESSAGE argument becomes the checkin message | |
(default "<none>" if None); or the file description if this is | |
a new file. | |
The optional OTHERFLAGS argument is passed to ci without | |
interpretation. | |
Any output from ci goes to directly to stdout. | |
""" | |
name, rev = self._unmangle(name_rev) | |
new = not self.isvalid(name) | |
if not message: message = "<none>" | |
if message and message[-1] != '\n': | |
message = message + '\n' | |
lockflag = "-u" | |
if new: | |
f = tempfile.NamedTemporaryFile() | |
f.write(message) | |
f.flush() | |
cmd = 'ci %s%s -t%s %s %s' % \ | |
(lockflag, rev, f.name, otherflags, name) | |
else: | |
message = re.sub(r'([\"$`])', r'\\\1', message) | |
cmd = 'ci %s%s -m"%s" %s %s' % \ | |
(lockflag, rev, message, otherflags, name) | |
return self._system(cmd) | |
# --- Exported support methods --- | |
def listfiles(self, pat = None): | |
"""Return a list of all version files matching optional PATTERN.""" | |
files = os.listdir(os.curdir) | |
files = filter(self._isrcs, files) | |
if os.path.isdir('RCS'): | |
files2 = os.listdir('RCS') | |
files2 = filter(self._isrcs, files2) | |
files = files + files2 | |
files = map(self.realname, files) | |
return self._filter(files, pat) | |
def isvalid(self, name): | |
"""Test whether NAME has a version file associated.""" | |
namev = self.rcsname(name) | |
return (os.path.isfile(namev) or | |
os.path.isfile(os.path.join('RCS', namev))) | |
def rcsname(self, name): | |
"""Return the pathname of the version file for NAME. | |
The argument can be a work file name or a version file name. | |
If the version file does not exist, the name of the version | |
file that would be created by "ci" is returned. | |
""" | |
if self._isrcs(name): namev = name | |
else: namev = name + ',v' | |
if os.path.isfile(namev): return namev | |
namev = os.path.join('RCS', os.path.basename(namev)) | |
if os.path.isfile(namev): return namev | |
if os.path.isdir('RCS'): | |
return os.path.join('RCS', namev) | |
else: | |
return namev | |
def realname(self, namev): | |
"""Return the pathname of the work file for NAME. | |
The argument can be a work file name or a version file name. | |
If the work file does not exist, the name of the work file | |
that would be created by "co" is returned. | |
""" | |
if self._isrcs(namev): name = namev[:-2] | |
else: name = namev | |
if os.path.isfile(name): return name | |
name = os.path.basename(name) | |
return name | |
def islocked(self, name_rev): | |
"""Test whether FILE (which must have a version file) is locked. | |
XXX This does not tell you which revision number is locked and | |
ignores any revision you may pass in (by virtue of using rlog | |
-L -R). | |
""" | |
f = self._open(name_rev, 'rlog -L -R') | |
line = f.readline() | |
status = self._closepipe(f) | |
if status: | |
raise IOError, status | |
if not line: return None | |
if line[-1] == '\n': | |
line = line[:-1] | |
return self.realname(name_rev) == self.realname(line) | |
def checkfile(self, name_rev): | |
"""Normalize NAME_REV into a (NAME, REV) tuple. | |
Raise an exception if there is no corresponding version file. | |
""" | |
name, rev = self._unmangle(name_rev) | |
if not self.isvalid(name): | |
raise os.error, 'not an rcs file %r' % (name,) | |
return name, rev | |
# --- Internal methods --- | |
def _open(self, name_rev, cmd = 'co -p', rflag = '-r'): | |
"""INTERNAL: open a read pipe to NAME_REV using optional COMMAND. | |
Optional FLAG is used to indicate the revision (default -r). | |
Default COMMAND is "co -p". | |
Return a file object connected by a pipe to the command's | |
output. | |
""" | |
name, rev = self.checkfile(name_rev) | |
namev = self.rcsname(name) | |
if rev: | |
cmd = cmd + ' ' + rflag + rev | |
return os.popen("%s %r" % (cmd, namev)) | |
def _unmangle(self, name_rev): | |
"""INTERNAL: Normalize NAME_REV argument to (NAME, REV) tuple. | |
Raise an exception if NAME contains invalid characters. | |
A NAME_REV argument is either NAME string (implying REV='') or | |
a tuple of the form (NAME, REV). | |
""" | |
if type(name_rev) == type(''): | |
name_rev = name, rev = name_rev, '' | |
else: | |
name, rev = name_rev | |
for c in rev: | |
if c not in self.okchars: | |
raise ValueError, "bad char in rev" | |
return name_rev | |
def _closepipe(self, f): | |
"""INTERNAL: Close PIPE and print its exit status if nonzero.""" | |
sts = f.close() | |
if not sts: return None | |
detail, reason = divmod(sts, 256) | |
if reason == 0: return 'exit', detail # Exit status | |
signal = reason&0x7F | |
if signal == 0x7F: | |
code = 'stopped' | |
signal = detail | |
else: | |
code = 'killed' | |
if reason&0x80: | |
code = code + '(coredump)' | |
return code, signal | |
def _system(self, cmd): | |
"""INTERNAL: run COMMAND in a subshell. | |
Standard input for the command is taken from /dev/null. | |
Raise IOError when the exit status is not zero. | |
Return whatever the calling method should return; normally | |
None. | |
A derived class may override this method and redefine it to | |
capture stdout/stderr of the command and return it. | |
""" | |
cmd = cmd + " </dev/null" | |
sts = os.system(cmd) | |
if sts: raise IOError, "command exit status %d" % sts | |
def _filter(self, files, pat = None): | |
"""INTERNAL: Return a sorted copy of the given list of FILES. | |
If a second PATTERN argument is given, only files matching it | |
are kept. No check for valid filenames is made. | |
""" | |
if pat: | |
def keep(name, pat = pat): | |
return fnmatch.fnmatch(name, pat) | |
files = filter(keep, files) | |
else: | |
files = files[:] | |
files.sort() | |
return files | |
def _remove(self, fn): | |
"""INTERNAL: remove FILE without complaints.""" | |
try: | |
os.unlink(fn) | |
except os.error: | |
pass | |
def _isrcs(self, name): | |
"""INTERNAL: Test whether NAME ends in ',v'.""" | |
return name[-2:] == ',v' |