1 import cherrypy
2 from cherrypy._cpcompat import unicodestr
5
6 created = 0
7 datachunk = u'butternut squash' * 256
8
9 @classmethod
12
13 @classmethod
16
18
20 self.incr()
21 try:
22 for i in range(1024):
23 yield self.datachunk
24 finally:
25 self.decr()
26
57
62
64
65
66 - def close(self, somearg):
68
71
72 from cherrypy.test import helper
74
75 @staticmethod
77
78 class Root(object):
79
80 @cherrypy.expose
81 def count(self, clsname):
82 cherrypy.response.headers['Content-Type'] = 'text/plain'
83 return unicodestr(globals()[clsname].created)
84
85 @cherrypy.expose
86 def getall(self, clsname):
87 cherrypy.response.headers['Content-Type'] = 'text/plain'
88 return globals()[clsname]()
89
90 @cherrypy.expose
91 def stream(self, clsname):
92 return self.getall(clsname)
93 stream._cp_config = {'response.stream': True}
94
95 cherrypy.tree.mount(Root())
96
98 if cherrypy.server.protocol_version != "HTTP/1.1":
99 return self.skip()
100
101 self.PROTOCOL = "HTTP/1.1"
102
103
104 closables = ['OurClosableIterator', 'OurGenerator']
105 unclosables = ['OurUnclosableIterator', 'OurNotClosableIterator']
106 all_classes = closables + unclosables
107
108 import random
109 random.shuffle(all_classes)
110
111 for clsname in all_classes:
112 self.getPage("/count/" + clsname)
113 self.assertStatus(200)
114 self.assertBody('0')
115
116
117
118
119 for clsname in all_classes:
120 itr_conn = self.get_conn()
121 itr_conn.putrequest("GET", "/getall/" + clsname)
122 itr_conn.endheaders()
123 response = itr_conn.getresponse()
124 self.assertEqual(response.status, 200)
125 headers = response.getheaders()
126 for header_name, header_value in headers:
127 if header_name.lower() == 'content-length':
128 assert header_value == unicodestr(1024 * 16 * 256), header_value
129 break
130 else:
131 raise AssertionError('No Content-Length header found')
132
133
134
135
136 self.getPage("/count/" + clsname)
137 self.assertStatus(200)
138 self.assertBody('0')
139
140
141
142 stream_counts = {}
143 for clsname in all_classes:
144 itr_conn = self.get_conn()
145 itr_conn.putrequest("GET", "/stream/" + clsname)
146 itr_conn.endheaders()
147 response = itr_conn.getresponse()
148 self.assertEqual(response.status, 200)
149 response.fp.read(65536)
150
151
152 self.getPage("/count/" + clsname)
153 self.assertBody('1')
154
155
156
157 itr_conn.close()
158 self.getPage("/count/" + clsname)
159
160
161
162
163 if clsname in closables:
164
165
166
167
168 if self.body != '0':
169 import time
170 time.sleep(0.1)
171 self.getPage("/count/" + clsname)
172
173 stream_counts[clsname] = int(self.body)
174
175
176
177 for clsname in closables:
178 assert stream_counts[clsname] == 0, (
179 'did not close off stream response correctly, expected '
180 'count of zero for %s: %s' % (clsname, stream_counts)
181 )
182