WvStreams
wvhttppool.cc
1/*
2 * Worldvisions Weaver Software:
3 * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4 *
5 * A fast, easy-to-use, parallelizing, pipelining HTTP/1.1 file retriever.
6 *
7 * See wvhttppool.h.
8 */
9#include <ctype.h>
10#include <time.h>
11#include "wvhttppool.h"
12#include "wvbufstream.h"
13#include "wvtcp.h"
14#include "strutils.h"
15
16bool WvHttpStream::global_enable_pipelining = true;
17int WvUrlStream::max_requests = 100;
18
19unsigned WvHash(const WvUrlStream::Target &n)
20{
21 WvString key("%s%s", n.remaddr, n.username);
22 return (WvHash(key));
23}
24
25
26WvUrlRequest::WvUrlRequest(WvStringParm _url, WvStringParm _method,
27 WvStringParm _headers, WvStream *content_source,
28 bool _create_dirs, bool _pipeline_test)
29 : url(_url), headers(_headers)
30{
31 instream = NULL;
32 create_dirs = _create_dirs;
33 pipeline_test = _pipeline_test;
34 method = _method;
35 is_dir = false; // for ftp primarily; set later
36
37 if (pipeline_test)
38 {
39 outstream = NULL;
40 putstream = NULL;
41 }
42 else
43 {
45 outstream = x;
46 x->url = url;
47
48 putstream = content_source;
49 }
50 inuse = false;
51}
52
53
54WvUrlRequest::~WvUrlRequest()
55{
56 done();
57}
58
59
60void WvUrlRequest::done()
61{
62 if (outstream)
63 {
64 outstream->seteof();
65 outstream = NULL;
66 }
67 if (putstream)
68 putstream = NULL;
69 inuse = false;
70}
71
72
73void WvUrlStream::addurl(WvUrlRequest *url)
74{
75 log(WvLog::Debug4, "Adding a new url: '%s'\n", url->url);
76
77 assert(url->outstream);
78
79 if (!url->url.isok())
80 return;
81
82 waiting_urls.append(url, false, "waiting_url");
83 request_next();
84}
85
86
87void WvUrlStream::delurl(WvUrlRequest *url)
88{
89 log(WvLog::Debug4, "Removing an url: '%s'\n", url->url);
90
91 if (url == curl)
92 doneurl();
93 waiting_urls.unlink(url);
94 urls.unlink(url);
95}
96
97
98WvHttpPool::WvHttpPool()
99 : log("HTTP Pool", WvLog::Debug), conns(10),
100 pipeline_incompatible(50)
101{
102 log("Pool initializing.\n");
103 num_streams_created = 0;
104}
105
106
107WvHttpPool::~WvHttpPool()
108{
109 log("Created %s individual session%s during this run.\n",
110 num_streams_created, num_streams_created == 1 ? "" : "s");
111 if (geterr())
112 log("Error was: %s\n", errstr());
113
114 // these must get zapped before the URL list, since they have pointers
115 // to URLs.
116 zap();
117 conns.zap();
118}
119
120
122{
123 // log(WvLog::Debug5, "pre_select: main:%s conns:%s urls:%s\n",
124 // count(), conns.count(), urls.count());
125
127
128 WvUrlStreamDict::Iter ci(conns);
129 for (ci.rewind(); ci.next(); )
130 {
131 if (!ci->isok())
132 si.msec_timeout = 0;
133 }
134
135 WvUrlRequestList::Iter i(urls);
136 for (i.rewind(); i.next(); )
137 {
138 if (!i->instream)
139 {
140 log(WvLog::Debug4, "Checking dns for '%s'\n", i->url.gethost());
141 if (i->url.resolve())
142 si.msec_timeout = 0;
143 else
144 dns.pre_select(i->url.gethost(), si);
145 }
146 }
147}
148
149
151{
152 bool sure = false;
153
154 WvUrlStreamDict::Iter ci(conns);
155 for (ci.rewind(); ci.next(); )
156 {
157 if (!ci->isok())
158 {
159 log(WvLog::Debug4, "Selecting true because of a dead stream.\n");
160 unconnect(ci.ptr());
161 ci.rewind();
162 sure = true;
163 }
164 }
165
166 WvUrlRequestList::Iter i(urls);
167 for (i.rewind(); i.next(); )
168 {
169 if ((!i->outstream && !i->inuse) || !i->url.isok())
170 {
171 //log("'%s' is dead: %s/%s\n",
172 // i->url, i->url.isok(), i.outstream->isok());
173 if (!i->url.isok())
174 {
175 log("URL not okay: '%s'\n", i->url);
176 i->done();
177 }
178 // nicely delete the url request
179 WvUrlStream::Target target(i->url.getaddr(), i->url.getuser());
180 WvUrlStream *s = conns[target];
181 if (s)
182 s->delurl(i.ptr());
183 i.xunlink();
184 continue;
185 }
186
187 if (!i->instream)
188 {
189 log(WvLog::Debug4, "Checking dns for '%s'\n", i->url.gethost());
190 if (i->url.resolve() || dns.post_select(i->url.gethost(), si))
191 {
192 log(WvLog::Debug4, "Selecting true because of '%s'\n", i->url);
193 sure = true;
194 }
195 }
196 }
197
198 return WvIStreamList::post_select(si) || sure;
199}
200
201
203{
205
206 WvUrlRequestList::Iter i(urls);
207 for (i.rewind(); i.next(); )
208 {
209 WvUrlStream *s;
210
211 if (!i->outstream || !i->url.isok() || !i->url.resolve())
212 continue; // skip it for now
213
214 WvUrlStream::Target target(i->url.getaddr(), i->url.getuser());
215
216 //log(WvLog::Info, "remaddr is %s; username is %s\n", target.remaddr,
217 // target.username);
218 s = conns[target];
219 //if (!s) log("conn for '%s' is not found.\n", ip);
220
221 if (s && !s->isok())
222 {
223 unconnect(s);
224 s = NULL;
225 }
226
227 if (!i->outstream)
228 continue; // unconnect might have caused this URL to be marked bad
229
230 if (!s)
231 {
232 num_streams_created++;
233 if (!strncasecmp(i->url.getproto(), "http", 4))
234 s = new WvHttpStream(target.remaddr, target.username,
235 i->url.getproto() == "https",
236 pipeline_incompatible);
237 else if (!strcasecmp(i->url.getproto(), "ftp"))
238 s = new WvFtpStream(target.remaddr, target.username,
239 i->url.getpassword());
240 conns.add(s, true);
241
242 // add it to the streamlist, so it can do things
243 append(s, false, "http/ftp stream");
244 }
245
246 if (!i->instream)
247 {
248 s->addurl(i.ptr());
249 i->instream = s;
250 }
251 }
252}
253
254
255WvBufUrlStream *WvHttpPool::addurl(WvStringParm _url, WvStringParm _method,
256 WvStringParm _headers, WvStream *content_source, bool create_dirs)
257{
258 log(WvLog::Debug4, "Adding a new url to pool: '%s'\n", _url);
259 WvUrlRequest *url = new WvUrlRequest(_url, _method, _headers, content_source,
260 create_dirs, false);
261 urls.append(url, true, "addurl");
262
263 return url->outstream;
264}
265
266
267void WvHttpPool::unconnect(WvUrlStream *s)
268{
269 if (!s->target.username)
270 log("Unconnecting stream to %s.\n", s->target.remaddr);
271 else
272 log("Unconnecting stream to %s@%s.\n", s->target.username,
273 s->target.remaddr);
274
275 WvUrlRequestList::Iter i(urls);
276 for (i.rewind(); i.next(); )
277 {
278 if (i->instream == s)
279 i->instream = NULL;
280 }
281
282 unlink(s);
283 conns.remove(s);
284}
virtual int geterr() const
If isok() is false, return the system error number corresponding to the error, -1 for a special error...
Definition: wverror.h:48
A WvFastString acts exactly like a WvString, but can take (const char *) strings without needing to a...
Definition: wvstring.h:94
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
Definition: wvhttppool.cc:202
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling ::select().
Definition: wvhttppool.cc:121
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
Definition: wvhttppool.cc:150
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling ::select().
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
A WvLog stream accepts log messages from applications and forwards them to all registered WvLogRcv's.
Definition: wvlog.h:57
bool post_select(WvStringParm hostname, WvStream::SelectInfo &si)
determines whether the resolving process is complete.
Definition: wvresolver.cc:331
void pre_select(WvStringParm hostname, WvStream::SelectInfo &si)
add all of our waiting fds to an fd_set for use with select().
Definition: wvresolver.cc:316
virtual bool isok() const
return true if the stream is actually usable right now
Unified support for streams, that is, sequences of bytes that may or may not be ready for read/write ...
Definition: wvstream.h:25
WvString is an implementation of a simple and efficient printable-string class.
Definition: wvstring.h:330
the data structure used by pre_select()/post_select() and internally by select().
Definition: iwvstream.h:50