| """CGI-savvy HTTP Server. | |
| This module builds on SimpleHTTPServer by implementing GET and POST | |
| requests to cgi-bin scripts. | |
| If the os.fork() function is not present (e.g. on Windows), | |
| os.popen2() is used as a fallback, with slightly altered semantics; if | |
| that function is not present either (e.g. on Macintosh), only Python | |
| scripts are supported, and they are executed by the current process. | |
| In all cases, the implementation is intentionally naive -- all | |
| requests are executed sychronously. | |
| SECURITY WARNING: DON'T USE THIS CODE UNLESS YOU ARE INSIDE A FIREWALL | |
| -- it may execute arbitrary Python code or external programs. | |
| Note that status code 200 is sent prior to execution of a CGI script, so | |
| scripts cannot send other status codes such as 302 (redirect). | |
| """ | |
| __version__ = "0.4" | |
| __all__ = ["CGIHTTPRequestHandler"] | |
| import os | |
| import sys | |
| import urllib | |
| import BaseHTTPServer | |
| import SimpleHTTPServer | |
| import select | |
| import copy | |
| class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): | |
| """Complete HTTP server with GET, HEAD and POST commands. | |
| GET and HEAD also support running CGI scripts. | |
| The POST command is *only* implemented for CGI scripts. | |
| """ | |
| # Determine platform specifics | |
| have_fork = hasattr(os, 'fork') | |
| have_popen2 = hasattr(os, 'popen2') | |
| have_popen3 = hasattr(os, 'popen3') | |
| # Make rfile unbuffered -- we need to read one line and then pass | |
| # the rest to a subprocess, so we can't use buffered input. | |
| rbufsize = 0 | |
| def do_POST(self): | |
| """Serve a POST request. | |
| This is only implemented for CGI scripts. | |
| """ | |
| if self.is_cgi(): | |
| self.run_cgi() | |
| else: | |
| self.send_error(501, "Can only POST to CGI scripts") | |
| def send_head(self): | |
| """Version of send_head that support CGI scripts""" | |
| if self.is_cgi(): | |
| return self.run_cgi() | |
| else: | |
| return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self) | |
| def is_cgi(self): | |
| """Test whether self.path corresponds to a CGI script. | |
| Returns True and updates the cgi_info attribute to the tuple | |
| (dir, rest) if self.path requires running a CGI script. | |
| Returns False otherwise. | |
| If any exception is raised, the caller should assume that | |
| self.path was rejected as invalid and act accordingly. | |
| The default implementation tests whether the normalized url | |
| path begins with one of the strings in self.cgi_directories | |
| (and the next character is a '/' or the end of the string). | |
| """ | |
| splitpath = _url_collapse_path_split(self.path) | |
| if splitpath[0] in self.cgi_directories: | |
| self.cgi_info = splitpath | |
| return True | |
| return False | |
| cgi_directories = ['/cgi-bin', '/htbin'] | |
| def is_executable(self, path): | |
| """Test whether argument path is an executable file.""" | |
| return executable(path) | |
| def is_python(self, path): | |
| """Test whether argument path is a Python script.""" | |
| head, tail = os.path.splitext(path) | |
| return tail.lower() in (".py", ".pyw") | |
| def run_cgi(self): | |
| """Execute a CGI script.""" | |
| path = self.path | |
| dir, rest = self.cgi_info | |
| i = path.find('/', len(dir) + 1) | |
| while i >= 0: | |
| nextdir = path[:i] | |
| nextrest = path[i+1:] | |
| scriptdir = self.translate_path(nextdir) | |
| if os.path.isdir(scriptdir): | |
| dir, rest = nextdir, nextrest | |
| i = path.find('/', len(dir) + 1) | |
| else: | |
| break | |
| # find an explicit query string, if present. | |
| i = rest.rfind('?') | |
| if i >= 0: | |
| rest, query = rest[:i], rest[i+1:] | |
| else: | |
| query = '' | |
| # dissect the part after the directory name into a script name & | |
| # a possible additional path, to be stored in PATH_INFO. | |
| i = rest.find('/') | |
| if i >= 0: | |
| script, rest = rest[:i], rest[i:] | |
| else: | |
| script, rest = rest, '' | |
| scriptname = dir + '/' + script | |
| scriptfile = self.translate_path(scriptname) | |
| if not os.path.exists(scriptfile): | |
| self.send_error(404, "No such CGI script (%r)" % scriptname) | |
| return | |
| if not os.path.isfile(scriptfile): | |
| self.send_error(403, "CGI script is not a plain file (%r)" % | |
| scriptname) | |
| return | |
| ispy = self.is_python(scriptname) | |
| if not ispy: | |
| if not (self.have_fork or self.have_popen2 or self.have_popen3): | |
| self.send_error(403, "CGI script is not a Python script (%r)" % | |
| scriptname) | |
| return | |
| if not self.is_executable(scriptfile): | |
| self.send_error(403, "CGI script is not executable (%r)" % | |
| scriptname) | |
| return | |
| # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html | |
| # XXX Much of the following could be prepared ahead of time! | |
| env = copy.deepcopy(os.environ) | |
| env['SERVER_SOFTWARE'] = self.version_string() | |
| env['SERVER_NAME'] = self.server.server_name | |
| env['GATEWAY_INTERFACE'] = 'CGI/1.1' | |
| env['SERVER_PROTOCOL'] = self.protocol_version | |
| env['SERVER_PORT'] = str(self.server.server_port) | |
| env['REQUEST_METHOD'] = self.command | |
| uqrest = urllib.unquote(rest) | |
| env['PATH_INFO'] = uqrest | |
| env['PATH_TRANSLATED'] = self.translate_path(uqrest) | |
| env['SCRIPT_NAME'] = scriptname | |
| if query: | |
| env['QUERY_STRING'] = query | |
| host = self.address_string() | |
| if host != self.client_address[0]: | |
| env['REMOTE_HOST'] = host | |
| env['REMOTE_ADDR'] = self.client_address[0] | |
| authorization = self.headers.getheader("authorization") | |
| if authorization: | |
| authorization = authorization.split() | |
| if len(authorization) == 2: | |
| import base64, binascii | |
| env['AUTH_TYPE'] = authorization[0] | |
| if authorization[0].lower() == "basic": | |
| try: | |
| authorization = base64.decodestring(authorization[1]) | |
| except binascii.Error: | |
| pass | |
| else: | |
| authorization = authorization.split(':') | |
| if len(authorization) == 2: | |
| env['REMOTE_USER'] = authorization[0] | |
| # XXX REMOTE_IDENT | |
| if self.headers.typeheader is None: | |
| env['CONTENT_TYPE'] = self.headers.type | |
| else: | |
| env['CONTENT_TYPE'] = self.headers.typeheader | |
| length = self.headers.getheader('content-length') | |
| if length: | |
| env['CONTENT_LENGTH'] = length | |
| referer = self.headers.getheader('referer') | |
| if referer: | |
| env['HTTP_REFERER'] = referer | |
| accept = [] | |
| for line in self.headers.getallmatchingheaders('accept'): | |
| if line[:1] in "\t\n\r ": | |
| accept.append(line.strip()) | |
| else: | |
| accept = accept + line[7:].split(',') | |
| env['HTTP_ACCEPT'] = ','.join(accept) | |
| ua = self.headers.getheader('user-agent') | |
| if ua: | |
| env['HTTP_USER_AGENT'] = ua | |
| co = filter(None, self.headers.getheaders('cookie')) | |
| if co: | |
| env['HTTP_COOKIE'] = ', '.join(co) | |
| # XXX Other HTTP_* headers | |
| # Since we're setting the env in the parent, provide empty | |
| # values to override previously set values | |
| for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH', | |
| 'HTTP_USER_AGENT', 'HTTP_COOKIE', 'HTTP_REFERER'): | |
| env.setdefault(k, "") | |
| self.send_response(200, "Script output follows") | |
| decoded_query = query.replace('+', ' ') | |
| if self.have_fork: | |
| # Unix -- fork as we should | |
| args = [script] | |
| if '=' not in decoded_query: | |
| args.append(decoded_query) | |
| nobody = nobody_uid() | |
| self.wfile.flush() # Always flush before forking | |
| pid = os.fork() | |
| if pid != 0: | |
| # Parent | |
| pid, sts = os.waitpid(pid, 0) | |
| # throw away additional data [see bug #427345] | |
| while select.select([self.rfile], [], [], 0)[0]: | |
| if not self.rfile.read(1): | |
| break | |
| if sts: | |
| self.log_error("CGI script exit status %#x", sts) | |
| return | |
| # Child | |
| try: | |
| try: | |
| os.setuid(nobody) | |
| except os.error: | |
| pass | |
| os.dup2(self.rfile.fileno(), 0) | |
| os.dup2(self.wfile.fileno(), 1) | |
| os.execve(scriptfile, args, env) | |
| except: | |
| self.server.handle_error(self.request, self.client_address) | |
| os._exit(127) | |
| else: | |
| # Non Unix - use subprocess | |
| import subprocess | |
| cmdline = [scriptfile] | |
| if self.is_python(scriptfile): | |
| interp = sys.executable | |
| if interp.lower().endswith("w.exe"): | |
| # On Windows, use python.exe, not pythonw.exe | |
| interp = interp[:-5] + interp[-4:] | |
| cmdline = [interp, '-u'] + cmdline | |
| if '=' not in query: | |
| cmdline.append(query) | |
| self.log_message("command: %s", subprocess.list2cmdline(cmdline)) | |
| try: | |
| nbytes = int(length) | |
| except (TypeError, ValueError): | |
| nbytes = 0 | |
| p = subprocess.Popen(cmdline, | |
| stdin = subprocess.PIPE, | |
| stdout = subprocess.PIPE, | |
| stderr = subprocess.PIPE, | |
| env = env | |
| ) | |
| if self.command.lower() == "post" and nbytes > 0: | |
| data = self.rfile.read(nbytes) | |
| else: | |
| data = None | |
| # throw away additional data [see bug #427345] | |
| while select.select([self.rfile._sock], [], [], 0)[0]: | |
| if not self.rfile._sock.recv(1): | |
| break | |
| stdout, stderr = p.communicate(data) | |
| self.wfile.write(stdout) | |
| if stderr: | |
| self.log_error('%s', stderr) | |
| p.stderr.close() | |
| p.stdout.close() | |
| status = p.returncode | |
| if status: | |
| self.log_error("CGI script exit status %#x", status) | |
| else: | |
| self.log_message("CGI script exited OK") | |
| # TODO(gregory.p.smith): Move this into an appropriate library. | |
| def _url_collapse_path_split(path): | |
| """ | |
| Given a URL path, remove extra '/'s and '.' path elements and collapse | |
| any '..' references. | |
| Implements something akin to RFC-2396 5.2 step 6 to parse relative paths. | |
| Returns: A tuple of (head, tail) where tail is everything after the final / | |
| and head is everything before it. Head will always start with a '/' and, | |
| if it contains anything else, never have a trailing '/'. | |
| Raises: IndexError if too many '..' occur within the path. | |
| """ | |
| # Similar to os.path.split(os.path.normpath(path)) but specific to URL | |
| # path semantics rather than local operating system semantics. | |
| path_parts = [] | |
| for part in path.split('/'): | |
| if part == '.': | |
| path_parts.append('') | |
| else: | |
| path_parts.append(part) | |
| # Filter out blank non trailing parts before consuming the '..'. | |
| path_parts = [part for part in path_parts[:-1] if part] + path_parts[-1:] | |
| if path_parts: | |
| tail_part = path_parts.pop() | |
| else: | |
| tail_part = '' | |
| head_parts = [] | |
| for part in path_parts: | |
| if part == '..': | |
| head_parts.pop() | |
| else: | |
| head_parts.append(part) | |
| if tail_part and tail_part == '..': | |
| head_parts.pop() | |
| tail_part = '' | |
| return ('/' + '/'.join(head_parts), tail_part) | |
| nobody = None | |
| def nobody_uid(): | |
| """Internal routine to get nobody's uid""" | |
| global nobody | |
| if nobody: | |
| return nobody | |
| try: | |
| import pwd | |
| except ImportError: | |
| return -1 | |
| try: | |
| nobody = pwd.getpwnam('nobody')[2] | |
| except KeyError: | |
| nobody = 1 + max(map(lambda x: x[2], pwd.getpwall())) | |
| return nobody | |
| def executable(path): | |
| """Test for executable file.""" | |
| try: | |
| st = os.stat(path) | |
| except os.error: | |
| return False | |
| return st.st_mode & 0111 != 0 | |
| def test(HandlerClass = CGIHTTPRequestHandler, | |
| ServerClass = BaseHTTPServer.HTTPServer): | |
| SimpleHTTPServer.test(HandlerClass, ServerClass) | |
| if __name__ == '__main__': | |
| test() |