WvStreams
wvdbusconn.cc
1/* -*- Mode: C++ -*-
2 * Worldvisions Weaver Software:
3 * Copyright (C) 2004-2006 Net Integration Technologies, Inc.
4 *
5 * Pathfinder Software:
6 * Copyright (C) 2007, Carillon Information Security Inc.
7 *
8 * This library is licensed under the LGPL, please read LICENSE for details.
9 *
10 */
11#include "wvdbusconn.h"
12#include "wvmoniker.h"
13#include "wvstrutils.h"
14#undef interface // windows
15#include <dbus/dbus.h>
16
17
18static WvString translate(WvStringParm dbus_moniker)
19{
21 WvStringList::Iter i(l);
22
23 if (!strncasecmp(dbus_moniker, "unix:", 5))
24 {
25 WvString path, tmpdir;
26 l.split(dbus_moniker+5, ",");
27 for (i.rewind(); i.next(); )
28 {
29 if (!strncasecmp(*i, "path=", 5))
30 path = *i + 5;
31 else if (!strncasecmp(*i, "abstract=", 9))
32 path = WvString("@%s", *i + 9);
33 else if (!strncasecmp(*i, "tmpdir=", 7))
34 tmpdir = *i + 7;
35 }
36 if (!!path)
37 return WvString("unix:%s", path);
38 else if (!!tmpdir)
39 return WvString("unix:%s/dbus.sock", tmpdir);
40 }
41 else if (!strncasecmp(dbus_moniker, "tcp:", 4))
42 {
43 WvString host, port, family;
44 l.split(dbus_moniker+4, ",");
45 for (i.rewind(); i.next(); )
46 {
47 if (!strncasecmp(*i, "family=", 7))
48 family = *i + 7;
49 else if (!strncasecmp(*i, "host=", 5))
50 host = *i + 5;
51 else if (!strncasecmp(*i, "port=", 5))
52 port = *i + 5;
53 }
54 if (!!host && !!port)
55 return WvString("tcp:%s:%s", host, port);
56 else if (!!host)
57 return WvString("tcp:%s", host);
58 else if (!!port)
59 return WvString("tcp:0.0.0.0:%s", port); // localhost
60 }
61
62 return dbus_moniker; // unrecognized
63}
64
65
66static IWvStream *stream_creator(WvStringParm _s, IObject *)
67{
68 WvString s(_s);
69
70 if (!strcasecmp(s, "starter"))
71 {
72 WvString startbus(getenv("DBUS_STARTER_ADDRESS"));
73 if (!!startbus)
74 return IWvStream::create(translate(startbus));
75 else
76 {
77 WvString starttype(getenv("DBUS_STARTER_BUS_TYPE"));
78 if (!!starttype && !strcasecmp(starttype, "system"))
79 s = "system";
80 else if (!!starttype && !strcasecmp(starttype, "session"))
81 s = "session";
82 }
83 }
84
85 if (!strcasecmp(s, "system"))
86 {
87 // NOTE: the environment variable for the address of the system
88 // bus is very often not set-- in that case, look in your dbus
89 // system bus config file (e.g. /etc/dbus-1/system.conf) for the
90 // raw address and either set this environment variable to that, or
91 // pass in the address directly
92 WvString bus(getenv("DBUS_SYSTEM_BUS_ADDRESS"));
93 if (!!bus)
94 return IWvStream::create(translate(bus));
95 }
96
97 if (!strcasecmp(s, "session"))
98 {
99 WvString bus(getenv("DBUS_SESSION_BUS_ADDRESS"));
100 if (!!bus)
101 return IWvStream::create(translate(bus));
102 }
103
104 return IWvStream::create(translate(s));
105}
106
107static WvMoniker<IWvStream> reg("dbus", stream_creator);
108
109
110static int conncount;
111
112WvDBusConn::WvDBusConn(IWvStream *_cloned, IWvDBusAuth *_auth, bool _client)
113 : WvStreamClone(_cloned),
114 log(WvString("DBus %s%s",
115 _client ? "" : "s",
116 ++conncount), WvLog::Debug5),
117 pending(10)
118{
119 init(_auth, _client);
120}
121
122
123WvDBusConn::WvDBusConn(WvStringParm moniker, IWvDBusAuth *_auth, bool _client)
124 : WvStreamClone(IWvStream::create(moniker)),
125 log(WvString("DBus %s%s",
126 _client ? "" : "s",
127 ++conncount), WvLog::Debug5),
128 pending(10)
129{
130 log("Connecting to '%s'\n", moniker);
131 init(_auth, _client);
132}
133
134
135void WvDBusConn::init(IWvDBusAuth *_auth, bool _client)
136{
137 log("Initializing.\n");
138 client = _client;
139 auth = _auth ? _auth : new WvDBusClientAuth;
140 authorized = in_post_select = false;
141 if (!client) set_uniquename(WvString(":%s.0", conncount));
142
143 if (!isok()) return;
144
145 delay_output(true);
146
147 // this will get enqueued until later, but we want to make sure it
148 // comes before anything the user tries to send - including anything
149 // goofy they enqueue in the authorization part.
150 if (client)
151 send_hello();
152
153 try_auth();
154}
155
157{
158 log("Shutting down.\n");
159 if (geterr())
160 log("Error was: %s\n", errstr());
161
162 close();
163
164 delete auth;
165}
166
167
169{
170 if (!closed)
171 log("Closing.\n");
173}
174
175
177{
178 return _uniquename;
179}
180
181
182void WvDBusConn::request_name(WvStringParm name, const WvDBusCallback &onreply,
183 time_t msec_timeout)
184{
185 uint32_t flags = (DBUS_NAME_FLAG_ALLOW_REPLACEMENT |
186 DBUS_NAME_FLAG_REPLACE_EXISTING);
187 WvDBusMsg msg("org.freedesktop.DBus", "/org/freedesktop/DBus",
188 "org.freedesktop.DBus", "RequestName");
189 msg.append(name).append(flags);
190 send(msg, onreply, msec_timeout);
191}
192
193
195{
196 msg.marshal(out_queue);
197 if (authorized)
198 {
199 log(" >> %s\n", msg);
200 write(out_queue);
201 }
202 else
203 log(" .> %s\n", msg);
204 return msg.get_serial();
205}
206
207
208void WvDBusConn::send(WvDBusMsg msg, const WvDBusCallback &onreply,
209 time_t msec_timeout)
210{
211 send(msg);
212 if (onreply)
213 add_pending(msg, onreply, msec_timeout);
214}
215
216
218{
219public:
220 WvDBusMsg *reply;
221
223 { reply = NULL; }
225 { delete reply; }
226 bool reply_wait(WvDBusMsg &msg)
227 { reply = new WvDBusMsg(msg); return true; }
228};
229
230
232 wv::function<void(uint32_t)> serial_cb)
233{
234 xxReplyWaiter rw;
235
236 send(msg, wv::bind(&xxReplyWaiter::reply_wait, &rw, _1),
237 msec_timeout);
238 if (serial_cb)
239 serial_cb(msg.get_serial());
240 while (!rw.reply && isok())
241 runonce();
242 if (!rw.reply)
243 return WvDBusError(msg, DBUS_ERROR_FAILED,
244 WvString("Connection closed (%s) "
245 "while waiting for reply.",
246 errstr()));
247 else
248 return *rw.reply;
249}
250
251
252void WvDBusConn::out(WvStringParm s)
253{
254 log(" >> %s", s);
255 print(s);
256}
257
258
259const char *WvDBusConn::in()
260{
261 const char *s = trim_string(getline(0));
262 if (s)
263 log("<< %s\n", s);
264 return s;
265}
266
267
268void WvDBusConn::send_hello()
269{
270 WvDBusMsg msg("org.freedesktop.DBus", "/org/freedesktop/DBus",
271 "org.freedesktop.DBus", "Hello");
272 send(msg, wv::bind(&WvDBusConn::_registered, this, _1));
273 WvDBusMsg msg2("org.freedesktop.DBus", "/org/freedesktop/DBus",
274 "org.freedesktop.DBus", "AddMatch");
275 msg2.append("type='signal'");
276 send(msg2); // don't need to monitor this for completion
277}
278
279
280void WvDBusConn::set_uniquename(WvStringParm s)
281{
282 // we want to print the message before switching log.app, so that we
283 // can trace which log.app turned into which
284 log("Assigned name '%s'\n", s);
285 _uniquename = s;
286 log.app = WvString("DBus %s%s", client ? "" : "s", uniquename());
287}
288
289
290void WvDBusConn::try_auth()
291{
292 bool done = auth->authorize(*this);
293 if (done)
294 {
295 // ready to send messages!
296 if (out_queue.used())
297 {
298 log(" >> (sending enqueued messages)\n");
299 write(out_queue);
300 }
301
302 authorized = true;
303 }
304}
305
306
307void WvDBusConn::add_callback(CallbackPri pri, WvDBusCallback cb, void *cookie)
308{
309 callbacks.append(new CallbackInfo(pri, cb, cookie), true);
310}
311
312
313void WvDBusConn::del_callback(void *cookie)
314{
315 // remember, there might be more than one callback with the same cookie.
316 CallbackInfoList::Iter i(callbacks);
317 for (i.rewind(); i.next(); )
318 if (i->cookie == cookie)
319 i.xunlink();
320}
321
322
323int WvDBusConn::priority_order(const CallbackInfo *a, const CallbackInfo *b)
324{
325 return a->pri - b->pri;
326}
327
329{
330 log("<< %s\n", msg);
331
332 // handle replies
333 uint32_t rserial = msg.get_replyserial();
334 if (rserial)
335 {
336 Pending *p = pending[rserial];
337 if (p)
338 {
339 p->cb(msg);
340 pending.remove(p);
341 return true; // handled it
342 }
343 }
344
345 // handle all the generic filters
346 CallbackInfoList::Sorter i(callbacks, priority_order);
347 for (i.rewind(); i.next(); )
348 {
349 bool handled = i->cb(msg);
350 if (handled) return true;
351 }
352
353 return false; // couldn't handle the message, sorry
354}
355
356
357WvDBusClientAuth::WvDBusClientAuth()
358{
359 sent_request = false;
360}
361
362
363wvuid_t WvDBusClientAuth::get_uid()
364{
365 return wvgetuid();
366}
367
368
370{
371 if (!sent_request)
372 {
373 c.write("\0", 1);
374 WvString uid = get_uid();
375 c.out("AUTH EXTERNAL %s\r\n\0", WvHexEncoder().strflushstr(uid));
376 sent_request = true;
377 }
378 else
379 {
380 const char *line = c.in();
381 if (line)
382 {
383 if (!strncasecmp(line, "OK ", 3))
384 {
385 c.out("BEGIN\r\n");
386 return true;
387 }
388 else if (!strncasecmp(line, "ERROR ", 6))
389 c.seterr("Auth failed: %s", line);
390 else
391 c.seterr("Unknown AUTH response: '%s'", line);
392 }
393 }
394
395 return false;
396}
397
398
399time_t WvDBusConn::mintimeout_msec()
400{
401 WvTime when = 0;
402 PendingDict::Iter i(pending);
403 for (i.rewind(); i.next(); )
404 {
405 if (!when || when > i->valid_until)
406 when = i->valid_until;
407 }
408 if (!when)
409 return -1;
410 else if (when <= wvstime())
411 return 0;
412 else
413 return msecdiff(when, wvstime());
414}
415
416
417bool WvDBusConn::post_select(SelectInfo &si)
418{
419 bool ready = WvStreamClone::post_select(si);
420 if (si.inherit_request) return ready;
421
422 if (in_post_select) return false;
423 in_post_select = true;
424
425 if (!authorized && ready)
426 try_auth();
427
428 if (!alarm_remaining())
429 {
430 WvTime now = wvstime();
431 PendingDict::Iter i(pending);
432 for (i.rewind(); i.next(); )
433 {
434 if (now > i->valid_until)
435 {
436 log("Expiring %s\n", i->msg);
437 expire_pending(i.ptr());
438 i.rewind();
439 }
440 }
441 }
442
443 if (authorized && ready)
444 {
445 // put this in a loop so that wvdbusd can forward packets rapidly.
446 // Otherwise TCP_NODELAY kicks in, because we do a select() loop
447 // between packets, which causes delay_output() to flush.
448 bool ran;
449 do
450 {
451 ran = false;
452 size_t needed = WvDBusMsg::demarshal_bytes_needed(in_queue);
453 size_t amt = needed - in_queue.used();
454 if (amt < 4096)
455 amt = 4096;
456 read(in_queue, amt);
457 WvDBusMsg *m;
458 while ((m = WvDBusMsg::demarshal(in_queue)) != NULL)
459 {
460 ran = true;
461 filter_func(*m);
462 delete m;
463 }
464 } while (ran);
465 }
466
467 alarm(mintimeout_msec());
468 in_post_select = false;
469 return false;
470}
471
472
474{
475 return !out_queue.used() && pending.isempty();
476}
477
478
479void WvDBusConn::expire_pending(Pending *p)
480{
481 if (p)
482 {
483 WvDBusCallback xcb(p->cb);
484 pending.remove(p); // prevent accidental recursion
485 WvDBusError e(p->msg, DBUS_ERROR_FAILED,
486 "Timed out while waiting for reply");
487 xcb(e);
488 }
489}
490
491
492void WvDBusConn::cancel_pending(uint32_t serial)
493{
494 Pending *p = pending[serial];
495 if (p)
496 {
497 WvDBusCallback xcb(p->cb);
498 WvDBusMsg msg(p->msg);
499 pending.remove(p); // prevent accidental recursion
500 WvDBusError e(msg, DBUS_ERROR_FAILED,
501 "Canceled while waiting for reply");
502 xcb(e);
503 }
504}
505
506
507void WvDBusConn::add_pending(WvDBusMsg &msg, WvDBusCallback cb,
508 time_t msec_timeout)
509{
510 uint32_t serial = msg.get_serial();
511 assert(serial);
512 if (pending[serial])
513 cancel_pending(serial);
514 pending.add(new Pending(msg, cb, msec_timeout), true);
515 alarm(mintimeout_msec());
516}
517
518
519bool WvDBusConn::_registered(WvDBusMsg &msg)
520{
521 WvDBusMsg::Iter i(msg);
522 _uniquename = i.getnext().get_str();
523 set_uniquename(_uniquename);
524 return true;
525}
526
The basic interface which is included by all other XPLC interfaces and objects.
Definition: IObject.h:65
virtual bool authorize(WvDBusConn &c)=0
Main action callback.
size_t used() const
Returns the number of elements in the buffer currently available for reading.
Definition: wvbufbase.h:92
virtual bool authorize(WvDBusConn &c)
Main action callback.
Definition: wvdbusconn.cc:369
virtual void close()
Close the underlying stream.
Definition: wvdbusconn.cc:168
virtual bool filter_func(WvDBusMsg &msg)
Called by for each received message.
Definition: wvdbusconn.cc:328
virtual ~WvDBusConn()
Release this connection.
Definition: wvdbusconn.cc:156
WvDBusConn(WvStringParm moniker, IWvDBusAuth *_auth=NULL, bool _client=true)
Creates a new dbus connection using the given WvStreams moniker.
Definition: wvdbusconn.cc:123
CallbackPri
The priority level of a callback registration.
Definition: wvdbusconn.h:170
WvDBusMsg send_and_wait(WvDBusMsg msg, time_t msec_timeout=WVDBUS_DEFAULT_TIMEOUT, wv::function< void(uint32_t)> serial_cb=0)
Send a message on the bus and wait for a reply to come in, returning the message when it does.
Definition: wvdbusconn.cc:231
void request_name(WvStringParm name, const WvDBusCallback &onreply=0, time_t msec_timeout=WVDBUS_DEFAULT_TIMEOUT)
Request the given service name on DBus.
Definition: wvdbusconn.cc:182
bool isidle()
Returns true if there are no outstanding messages that have not received (or timed out) their reply.
Definition: wvdbusconn.cc:473
uint32_t send(WvDBusMsg msg)
Send a message on the bus, not expecting any reply.
Definition: wvdbusconn.cc:194
void add_callback(CallbackPri pri, WvDBusCallback cb, void *cookie=NULL)
Adds a callback to the connection: all received messages will be sent to all callbacks to look at and...
Definition: wvdbusconn.cc:307
void del_callback(void *cookie)
Delete all callbacks that have the given cookie.
Definition: wvdbusconn.cc:313
WvString uniquename() const
Return this connection's unique name on the bus, assigned by the server at connect time.
Definition: wvdbusconn.cc:176
WvDBusMsg & append(const char *s)
The following methods are designed to allow appending various arguments to the message.
Definition: wvdbusmsg.cc:461
static WvDBusMsg * demarshal(WvBuf &buf)
Demarshals a new WvDBusMsg from a buffer containing its binary DBus protocol representation.
static size_t demarshal_bytes_needed(WvBuf &buf)
Given a buffer containing what might be the header of a DBus message, checks how many bytes need to b...
void marshal(WvBuf &buf)
Locks this message, encodes it in DBus binary protocol format, and adds it to the given buffer.
A WvFastString acts exactly like a WvString, but can take (const char *) strings without needing to a...
Definition: wvstring.h:94
A hex encoder.
Definition: wvhex.h:22
A WvLog stream accepts log messages from applications and forwards them to all registered WvLogRcv's.
Definition: wvlog.h:57
A type-safe version of WvMonikerBase that lets you provide create functions for object types other th...
Definition: wvmoniker.h:62
WvStreamClone simply forwards all requests to the "cloned" stream.
Definition: wvstreamclone.h:24
virtual void close()
Close this stream.
virtual bool isok() const
return true if the stream is actually usable right now
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
virtual int geterr() const
If isok() is false, return the system error number corresponding to the error, -1 for a special error...
void delay_output(bool is_delayed)
force write() to always buffer output.
Definition: wvstream.h:246
void alarm(time_t msec_timeout)
set an alarm, ie.
Definition: wvstream.cc:1049
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
Definition: wvstream.cc:532
void runonce(time_t msec_timeout=-1)
Exactly the same as: if (select(timeout)) callback();.
Definition: wvstream.h:391
char * getline(time_t wait_msec=0, char separator='\n', int readahead=1024)
Read up to one line of data from the stream and return a pointer to the internal buffer containing th...
Definition: wvstream.h:175
virtual size_t read(void *buf, size_t count)
read a data block on the stream.
Definition: wvstream.cc:490
time_t alarm_remaining()
return the number of milliseconds remaining before the alarm will go off; -1 means no alarm is set (i...
Definition: wvstream.cc:1058
virtual void seterr(int _errnum)
Override seterr() from WvError so that it auto-closes the stream.
Definition: wvstream.cc:451
This is a WvList of WvStrings, and is a really handy way to parse strings.
Definition: wvstringlist.h:28
void split(WvStringParm s, const char *splitchars=" \t\r\n", int limit=0)
split s and form a list ignoring splitchars (except at beginning and end) ie.
Definition: wvstringlist.cc:19
WvString is an implementation of a simple and efficient printable-string class.
Definition: wvstring.h:330
Based on (and interchangeable with) struct timeval.
Definition: wvtimeutils.h:18
Various little string functions.
char * trim_string(char *string)
Trims whitespace from the beginning and end of the character string, including carriage return / line...
Definition: strutils.cc:59