1 """Extensions to unittest for web frameworks.
2
3 Use the WebCase.getPage method to request a page from your HTTP server.
4
5 Framework Integration
6 =====================
7
8 If you have control over your server process, you can handle errors
9 in the server-side of the HTTP conversation a bit better. You must run
10 both the client (your WebCase tests) and the server in the same process
11 (but in separate threads, obviously).
12
13 When an error occurs in the framework, call server_error. It will print
14 the traceback to stdout, and keep any assertions you have from running
15 (the assumption is that, if the server errors, the page output will not
16 be of further significance to your tests).
17 """
18
19 import pprint
20 import re
21 import socket
22 import sys
23 import time
24 import traceback
25 import types
26
27 from unittest import *
28 from unittest import _TextTestResult
29
30 from cherrypy._cpcompat import basestring, ntob, py3k, HTTPConnection
31 from cherrypy._cpcompat import HTTPSConnection, unicodestr
32
33
35 """Return an IP address for a client connection given the server host.
36
37 If the server is listening on '0.0.0.0' (INADDR_ANY)
38 or '::' (IN6ADDR_ANY), this will return the proper localhost."""
39 if host == '0.0.0.0':
40
41 return "127.0.0.1"
42 if host == '::':
43
44 return "::1"
45 return host
46
47
49
51
52 if self.errors or self.failures:
53 if self.dots or self.showAll:
54 self.stream.writeln()
55 self.printErrorList('ERROR', self.errors)
56 self.printErrorList('FAIL', self.failures)
57
58
60
61 """A test runner class that displays results in textual form."""
62
65
66 - def run(self, test):
67 "Run the given test case or test suite."
68
69 result = self._makeResult()
70 test(result)
71 result.printErrors()
72 if not result.wasSuccessful():
73 self.stream.write("FAILED (")
74 failed, errored = list(map(len, (result.failures, result.errors)))
75 if failed:
76 self.stream.write("failures=%d" % failed)
77 if errored:
78 if failed:
79 self.stream.write(", ")
80 self.stream.write("errors=%d" % errored)
81 self.stream.writeln(")")
82 return result
83
84
86
88 """Return a suite of all tests cases given a string specifier.
89
90 The name may resolve either to a module, a test case class, a
91 test method within a test case class, or a callable object which
92 returns a TestCase or TestSuite instance.
93
94 The method optionally resolves the names relative to a given module.
95 """
96 parts = name.split('.')
97 unused_parts = []
98 if module is None:
99 if not parts:
100 raise ValueError("incomplete test name: %s" % name)
101 else:
102 parts_copy = parts[:]
103 while parts_copy:
104 target = ".".join(parts_copy)
105 if target in sys.modules:
106 module = reload(sys.modules[target])
107 parts = unused_parts
108 break
109 else:
110 try:
111 module = __import__(target)
112 parts = unused_parts
113 break
114 except ImportError:
115 unused_parts.insert(0, parts_copy[-1])
116 del parts_copy[-1]
117 if not parts_copy:
118 raise
119 parts = parts[1:]
120 obj = module
121 for part in parts:
122 obj = getattr(obj, part)
123
124 if isinstance(obj, types.ModuleType):
125 return self.loadTestsFromModule(obj)
126 elif (((py3k and isinstance(obj, type))
127 or isinstance(obj, (type, types.ClassType)))
128 and issubclass(obj, TestCase)):
129 return self.loadTestsFromTestCase(obj)
130 elif isinstance(obj, types.UnboundMethodType):
131 if py3k:
132 return obj.__self__.__class__(obj.__name__)
133 else:
134 return obj.im_class(obj.__name__)
135 elif hasattr(obj, '__call__'):
136 test = obj()
137 if not isinstance(test, TestCase) and \
138 not isinstance(test, TestSuite):
139 raise ValueError("calling %s returned %s, "
140 "not a test" % (obj, test))
141 return test
142 else:
143 raise ValueError("do not know how to make test from: %s" % obj)
144
145
146 try:
147
148 if sys.platform[:4] == 'java':
150
151 return sys.stdin.read(1)
152 else:
153
154 import msvcrt
155
157 return msvcrt.getch()
158 except ImportError:
159
160 import tty
161 import termios
162
164 fd = sys.stdin.fileno()
165 old_settings = termios.tcgetattr(fd)
166 try:
167 tty.setraw(sys.stdin.fileno())
168 ch = sys.stdin.read(1)
169 finally:
170 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
171 return ch
172
173
175 HOST = "127.0.0.1"
176 PORT = 8000
177 HTTP_CONN = HTTPConnection
178 PROTOCOL = "HTTP/1.1"
179
180 scheme = "http"
181 url = None
182
183 status = None
184 headers = None
185 body = None
186
187 encoding = 'utf-8'
188
189 time = None
190
202
204 """Make our HTTP_CONN persistent (or not).
205
206 If the 'on' argument is True (the default), then self.HTTP_CONN
207 will be set to an instance of HTTPConnection (or HTTPS
208 if self.scheme is "https"). This will then persist across requests.
209
210 We only allow for a single open connection, so if you call this
211 and we currently have an open connection, it will be closed.
212 """
213 try:
214 self.HTTP_CONN.close()
215 except (TypeError, AttributeError):
216 pass
217
218 if on:
219 self.HTTP_CONN = self.get_conn(auto_open=auto_open)
220 else:
221 if self.scheme == "https":
222 self.HTTP_CONN = HTTPSConnection
223 else:
224 self.HTTP_CONN = HTTPConnection
225
227 return hasattr(self.HTTP_CONN, "__class__")
228
231 persistent = property(_get_persistent, _set_persistent)
232
234 """Return an IP address for a client connection.
235
236 If the server is listening on '0.0.0.0' (INADDR_ANY)
237 or '::' (IN6ADDR_ANY), this will return the proper localhost."""
238 return interface(self.HOST)
239
240 - def getPage(self, url, headers=None, method="GET", body=None,
241 protocol=None):
242 """Open the url with debugging support. Return status, headers, body.
243 """
244 ServerError.on = False
245
246 if isinstance(url, unicodestr):
247 url = url.encode('utf-8')
248 if isinstance(body, unicodestr):
249 body = body.encode('utf-8')
250
251 self.url = url
252 self.time = None
253 start = time.time()
254 result = openURL(url, headers, method, body, self.HOST, self.PORT,
255 self.HTTP_CONN, protocol or self.PROTOCOL)
256 self.time = time.time() - start
257 self.status, self.headers, self.body = result
258
259
260 self.cookies = [('Cookie', v) for k, v in self.headers
261 if k.lower() == 'set-cookie']
262
263 if ServerError.on:
264 raise ServerError()
265 return result
266
267 interactive = True
268 console_height = 30
269
271 print("")
272 print(" ERROR: %s" % msg)
273
274 if not self.interactive:
275 raise self.failureException(msg)
276
277 p = (" Show: "
278 "[B]ody [H]eaders [S]tatus [U]RL; "
279 "[I]gnore, [R]aise, or sys.e[X]it >> ")
280 sys.stdout.write(p)
281 sys.stdout.flush()
282 while True:
283 i = getchar().upper()
284 if not isinstance(i, type("")):
285 i = i.decode('ascii')
286 if i not in "BHSUIRX":
287 continue
288 print(i.upper())
289 if i == "B":
290 for x, line in enumerate(self.body.splitlines()):
291 if (x + 1) % self.console_height == 0:
292
293 sys.stdout.write("<-- More -->\r")
294 m = getchar().lower()
295
296 sys.stdout.write(" \r")
297 if m == "q":
298 break
299 print(line)
300 elif i == "H":
301 pprint.pprint(self.headers)
302 elif i == "S":
303 print(self.status)
304 elif i == "U":
305 print(self.url)
306 elif i == "I":
307
308 return
309 elif i == "R":
310 raise self.failureException(msg)
311 elif i == "X":
312 self.exit()
313 sys.stdout.write(p)
314 sys.stdout.flush()
315
318
347
349 """Fail if (key, [value]) not in self.headers."""
350 lowkey = key.lower()
351 for k, v in self.headers:
352 if k.lower() == lowkey:
353 if value is None or str(value) == v:
354 return v
355
356 if msg is None:
357 if value is None:
358 msg = '%r not in headers' % key
359 else:
360 msg = '%r:%r not in headers' % (key, value)
361 self._handlewebError(msg)
362
364 """Fail if header indicated by key doesn't have one of the values."""
365 lowkey = key.lower()
366 for k, v in self.headers:
367 if k.lower() == lowkey:
368 matches = [value for value in values if str(value) == v]
369 if matches:
370 return matches
371
372 if msg is None:
373 msg = '%(key)r not in %(values)r' % vars()
374 self._handlewebError(msg)
375
377 """Fail if the header does not contain the specified value"""
378 actual_value = self.assertHeader(key, msg=msg)
379 header_values = map(str.strip, actual_value.split(','))
380 if value in header_values:
381 return value
382
383 if msg is None:
384 msg = "%r not in %r" % (value, header_values)
385 self._handlewebError(msg)
386
388 """Fail if key in self.headers."""
389 lowkey = key.lower()
390 matches = [k for k, v in self.headers if k.lower() == lowkey]
391 if matches:
392 if msg is None:
393 msg = '%r in headers' % key
394 self._handlewebError(msg)
395
396 - def assertBody(self, value, msg=None):
397 """Fail if value != self.body."""
398 if isinstance(value, unicodestr):
399 value = value.encode(self.encoding)
400 if value != self.body:
401 if msg is None:
402 msg = 'expected body:\n%r\n\nactual body:\n%r' % (
403 value, self.body)
404 self._handlewebError(msg)
405
406 - def assertInBody(self, value, msg=None):
407 """Fail if value not in self.body."""
408 if isinstance(value, unicodestr):
409 value = value.encode(self.encoding)
410 if value not in self.body:
411 if msg is None:
412 msg = '%r not in body: %s' % (value, self.body)
413 self._handlewebError(msg)
414
415 - def assertNotInBody(self, value, msg=None):
416 """Fail if value in self.body."""
417 if isinstance(value, unicodestr):
418 value = value.encode(self.encoding)
419 if value in self.body:
420 if msg is None:
421 msg = '%r found in body' % value
422 self._handlewebError(msg)
423
424 - def assertMatchesBody(self, pattern, msg=None, flags=0):
425 """Fail if value (a regex pattern) is not in self.body."""
426 if isinstance(pattern, unicodestr):
427 pattern = pattern.encode(self.encoding)
428 if re.search(pattern, self.body, flags) is None:
429 if msg is None:
430 msg = 'No match for %r in body' % pattern
431 self._handlewebError(msg)
432
433
434 methods_with_bodies = ("POST", "PUT")
435
436
438 """Return request headers, with required headers added (if missing)."""
439 if headers is None:
440 headers = []
441
442
443
444 found = False
445 for k, v in headers:
446 if k.lower() == 'host':
447 found = True
448 break
449 if not found:
450 if port == 80:
451 headers.append(("Host", host))
452 else:
453 headers.append(("Host", "%s:%s" % (host, port)))
454
455 if method in methods_with_bodies:
456
457 found = False
458 for k, v in headers:
459 if k.lower() == 'content-type':
460 found = True
461 break
462 if not found:
463 headers.append(
464 ("Content-Type", "application/x-www-form-urlencoded"))
465 headers.append(("Content-Length", str(len(body or ""))))
466
467 return headers
468
469
471 """Return status, headers, body the way we like from a response."""
472 if py3k:
473 h = response.getheaders()
474 else:
475 h = []
476 key, value = None, None
477 for line in response.msg.headers:
478 if line:
479 if line[0] in " \t":
480 value += line.strip()
481 else:
482 if key and value:
483 h.append((key, value))
484 key, value = line.split(":", 1)
485 key = key.strip()
486 value = value.strip()
487 if key and value:
488 h.append((key, value))
489
490 return "%s %s" % (response.status, response.reason), h, response.read()
491
492
493 -def openURL(url, headers=None, method="GET", body=None,
494 host="127.0.0.1", port=8000, http_conn=HTTPConnection,
495 protocol="HTTP/1.1"):
496 """Open the given HTTP resource and return status, headers, and body."""
497
498 headers = cleanHeaders(headers, method, body, host, port)
499
500
501
502 for trial in range(10):
503 try:
504
505 if hasattr(http_conn, "host"):
506 conn = http_conn
507 else:
508 conn = http_conn(interface(host), port)
509
510 conn._http_vsn_str = protocol
511 conn._http_vsn = int("".join([x for x in protocol if x.isdigit()]))
512
513
514 if sys.version_info < (2, 4):
515 def putheader(self, header, value):
516 if header == 'Accept-Encoding' and value == 'identity':
517 return
518 self.__class__.putheader(self, header, value)
519 import new
520 conn.putheader = new.instancemethod(
521 putheader, conn, conn.__class__)
522 conn.putrequest(method.upper(), url, skip_host=True)
523 elif not py3k:
524 conn.putrequest(method.upper(), url, skip_host=True,
525 skip_accept_encoding=True)
526 else:
527 import http.client
528
529
530 def putrequest(self, method, url):
531 if (
532 self._HTTPConnection__response and
533 self._HTTPConnection__response.isclosed()
534 ):
535 self._HTTPConnection__response = None
536
537 if self._HTTPConnection__state == http.client._CS_IDLE:
538 self._HTTPConnection__state = (
539 http.client._CS_REQ_STARTED)
540 else:
541 raise http.client.CannotSendRequest()
542
543 self._method = method
544 if not url:
545 url = ntob('/')
546 request = ntob(' ').join(
547 (method.encode("ASCII"),
548 url,
549 self._http_vsn_str.encode("ASCII")))
550 self._output(request)
551 import types
552 conn.putrequest = types.MethodType(putrequest, conn)
553
554 conn.putrequest(method.upper(), url)
555
556 for key, value in headers:
557 conn.putheader(key, value.encode("Latin-1"))
558 conn.endheaders()
559
560 if body is not None:
561 conn.send(body)
562
563
564 response = conn.getresponse()
565
566 s, h, b = shb(response)
567
568 if not hasattr(http_conn, "host"):
569
570 conn.close()
571
572 return s, h, b
573 except socket.error:
574 time.sleep(0.5)
575 if trial == 9:
576 raise
577
578
579
580
581 ignored_exceptions = []
582
583
584
585
586 ignore_all = False
587
588
591
592
594 """Server debug hook. Return True if exception handled, False if ignored.
595
596 You probably want to wrap this, so you can still handle an error using
597 your framework when it's ignored.
598 """
599 if exc is None:
600 exc = sys.exc_info()
601
602 if ignore_all or exc[0] in ignored_exceptions:
603 return False
604 else:
605 ServerError.on = True
606 print("")
607 print("".join(traceback.format_exception(*exc)))
608 return True
609