Connection.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
19 
20 #include <tbb/task.h>
21 #include <google/protobuf/message.h>
22 
23 // This fixes compiler warnings, see #3147 and #3160
24 #ifndef BOOST_BIND_GLOBAL_PLACEHOLDERS
25 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
26 #endif
27 #include <boost/asio.hpp>
28 #include <boost/bind.hpp>
29 #include <boost/function.hpp>
30 #include <boost/thread.hpp>
31 #include <boost/tuple/tuple.hpp>
32 
33 #include <string>
34 #include <vector>
35 #include <iostream>
36 #include <iomanip>
37 #include <deque>
38 #include <utility>
39 
40 #include "gazebo/common/Event.hh"
41 #include "gazebo/common/Console.hh"
44 #include "gazebo/util/system.hh"
45 
46 #define HEADER_LENGTH 8
47 
48 namespace gazebo
49 {
50  namespace transport
51  {
52  extern GZ_TRANSPORT_VISIBLE bool is_stopped();
53 
54  class IOManager;
55  class Connection;
56  typedef boost::shared_ptr<Connection> ConnectionPtr;
57 
61  class GZ_TRANSPORT_VISIBLE ConnectionReadTask : public tbb::task
62  {
67  public: ConnectionReadTask(
68  boost::function<void (const std::string &)> _func,
69  const std::string &_data) :
70  func(_func),
71  data(_data)
72  {
73  }
74 
77  public: tbb::task *execute()
78  {
79  this->func(this->data);
80  return NULL;
81  }
82 
84  private: boost::function<void (const std::string &)> func;
85 
87  private: std::string data;
88  };
90 
105  class GZ_TRANSPORT_VISIBLE Connection :
106  public boost::enable_shared_from_this<Connection>
107  {
109  public: Connection();
110 
112  public: virtual ~Connection();
113 
118  public: bool Connect(const std::string &_host, unsigned int _port);
119 
121  typedef boost::function<void(const ConnectionPtr&)> AcceptCallback;
122 
127  public: void Listen(unsigned int _port, const AcceptCallback &_acceptCB);
128 
130  typedef boost::function<void(const std::string &_data)> ReadCallback;
131 
135  public: void StartRead(const ReadCallback &_cb);
136 
138  public: void StopRead();
139 
141  public: void Shutdown();
142 
145  public: bool IsOpen() const;
146 
148  private: void Close();
149 
151  public: void Cancel();
152 
156  public: bool Read(std::string &_data);
157 
165  public: void EnqueueMsg(const std::string &_buffer,
166  boost::function<void(uint32_t)> _cb, uint32_t _id,
167  bool _force = false);
168 
173  public: void EnqueueMsg(const std::string &_buffer, bool _force = false);
174 
177  public: std::string GetLocalURI() const;
178 
181  public: std::string GetRemoteURI() const;
182 
185  public: std::string GetLocalAddress() const;
186 
189  public: unsigned int GetLocalPort() const;
190 
193  public: std::string GetRemoteAddress() const;
194 
197  public: unsigned int GetRemotePort() const;
198 
201  public: std::string GetRemoteHostname() const;
202 
205  public: static std::string GetLocalHostname();
206 
209  public: template<typename Handler>
210  void AsyncRead(Handler _handler)
211  {
212  boost::mutex::scoped_lock lock(this->socketMutex);
213  if (!this->IsOpen())
214  {
215  gzerr << "AsyncRead on a closed socket\n";
216  return;
217  }
218 
219  void (Connection::*f)(const boost::system::error_code &,
220  boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
221 
222  this->inboundHeader.resize(HEADER_LENGTH);
223  boost::asio::async_read(*this->socket,
224  boost::asio::buffer(this->inboundHeader),
225  common::weakBind(f, this->shared_from_this(),
226  boost::asio::placeholders::error,
227  boost::make_tuple(_handler)));
228  }
229 
237  private: template<typename Handler>
238  void OnReadHeader(const boost::system::error_code &_e,
239  boost::tuple<Handler> _handler)
240  {
241  if (_e)
242  {
243  if (_e.value() == boost::asio::error::eof)
244  this->isOpen = false;
245  }
246  else
247  {
248  std::size_t inboundData_size = 0;
249  std::string header(&this->inboundHeader[0],
250  this->inboundHeader.size());
251  this->inboundHeader.clear();
252 
253  inboundData_size = this->ParseHeader(header);
254 
255  if (inboundData_size > 0)
256  {
257  // Start the asynchronous call to receive data
258  this->inboundData.resize(inboundData_size);
259 
260  void (Connection::*f)(const boost::system::error_code &e,
261  boost::tuple<Handler>) =
262  &Connection::OnReadData<Handler>;
263 
264  boost::asio::async_read(*this->socket,
265  boost::asio::buffer(this->inboundData),
266  common::weakBind(f, this->shared_from_this(),
267  boost::asio::placeholders::error,
268  _handler));
269  }
270  else
271  {
272  gzerr << "Header is empty\n";
273  boost::get<0>(_handler)("");
274  // This code tries to read the header again. We should
275  // never get here.
276  // this->inboundHeader.resize(HEADER_LENGTH);
277 
278  // void (Connection::*f)(const boost::system::error_code &,
279  // boost::tuple<Handler>) =
280  // &Connection::OnReadHeader<Handler>;
281 
282  // boost::asio::async_read(*this->socket,
283  // boost::asio::buffer(this->inboundHeader),
284  // common::weakBind(f, this->shared_from_this(),
285  // boost::asio::placeholders::error, _handler));
286  }
287  }
288  }
289 
297  private: template<typename Handler>
298  void OnReadData(const boost::system::error_code &_e,
299  boost::tuple<Handler> _handler)
300  {
301  if (_e)
302  {
303  if (_e.value() == boost::asio::error::eof)
304  this->isOpen = false;
305  }
306 
307  // Inform caller that data has been received
308  std::string data(&this->inboundData[0],
309  this->inboundData.size());
310  this->inboundData.clear();
311 
312  if (data.empty())
313  gzerr << "OnReadData got empty data!!!\n";
314 
315  if (!_e && !transport::is_stopped())
316  {
317  ConnectionReadTask *task = new(tbb::task::allocate_root())
318  ConnectionReadTask(boost::get<0>(_handler), data);
319  tbb::task::enqueue(*task);
320 
321  // Non-tbb version:
322  // boost::get<0>(_handler)(data);
323  }
324  }
325 
329  public: event::ConnectionPtr ConnectToShutdown(boost::function<void()>
330  _subscriber)
331  { return this->shutdown.Connect(_subscriber); }
332 
334  public: void ProcessWriteQueue(bool _blocking = false);
335 
338  public: unsigned int GetId() const;
339 
343  public: static bool ValidateIP(const std::string &_ip);
344 
348  public: std::string GetIPWhiteList() const;
349 
352  private: void PostWrite();
353 
357  private: void OnWrite(const boost::system::error_code &_e);
358 
361  private: void OnAccept(const boost::system::error_code &_e);
362 
365  private: std::size_t ParseHeader(const std::string &_header);
366 
368  private: void ReadLoop(const ReadCallback &_cb);
369 
372  private: static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
373 
376  private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() const;
377 
380  private: static std::string GetHostname(
381  boost::asio::ip::tcp::endpoint _ep);
382 
386  private: void OnConnect(const boost::system::error_code &_error,
387  boost::asio::ip::tcp::resolver::iterator _endPointIter);
388 
390  private: boost::asio::ip::tcp::socket *socket;
391 
393  private: boost::asio::ip::tcp::acceptor *acceptor;
394 
396  private: std::deque<std::string> writeQueue;
397 
400  private: std::deque< std::vector<
401  std::pair<boost::function<void(uint32_t)>, uint32_t> > >
402  callbacks;
403 
405  private: boost::mutex connectMutex;
406 
408  private: boost::recursive_mutex writeMutex;
409 
411  private: boost::recursive_mutex readMutex;
412 
414  private: mutable boost::mutex socketMutex;
415 
417  private: boost::condition_variable connectCondition;
418 
420  private: AcceptCallback acceptCB;
421 
423  private: std::vector<char> inboundHeader;
424 
426  private: std::vector<char> inboundData;
427 
429  private: bool readQuit;
430 
432  private: unsigned int id;
433 
435  private: static unsigned int idCounter;
436 
438  private: ConnectionPtr acceptConn;
439 
441  private: event::EventT<void()> shutdown;
442 
444  private: static IOManager *iomanager;
445 
447  private: unsigned int writeCount;
448 
450  private: std::string localURI;
451 
453  private: std::string localAddress;
454 
456  private: std::string remoteURI;
457 
459  private: std::string remoteAddress;
460 
462  private: bool connectError;
463 
465  private: std::string ipWhiteList;
466 
468  private: bool dropMsgLogged;
469 
471  private: bool isOpen;
472  };
474  }
475 }
476 #endif
#define NULL
Definition: CommonTypes.hh:31
transport
Definition: ConnectionManager.hh:35
#define HEADER_LENGTH
Definition: Connection.hh:46
A class for event processing.
Definition: Event.hh:100
Single TCP/IP connection manager.
Definition: Connection.hh:107
bool IsOpen() const
Is the connection open?
void ProcessWriteQueue(bool _blocking=false)
Handle on-write callbacks.
bool Connect(const std::string &_host, unsigned int _port)
Connect to a remote host.
void EnqueueMsg(const std::string &_buffer, boost::function< void(uint32_t)> _cb, uint32_t _id, bool _force=false)
Write data to the socket.
std::string GetLocalURI() const
Get the local URI.
unsigned int GetId() const
Get the ID of the connection.
void Listen(unsigned int _port, const AcceptCallback &_acceptCB)
Start a server that listens on a port.
void StartRead(const ReadCallback &_cb)
Start a thread that reads from the connection and passes new message to the ReadCallback.
std::string GetRemoteURI() const
Get the remote URI.
std::string GetRemoteHostname() const
Get the remote hostname.
std::string GetIPWhiteList() const
Get the IP white list, from GAZEBO_IP_WHITE_LIST environment variable.
static std::string GetLocalHostname()
Get the local hostname.
unsigned int GetLocalPort() const
Get the port of this connection.
void Cancel()
Cancel all async operations on an open socket.
std::string GetRemoteAddress() const
Get the remote address.
void EnqueueMsg(const std::string &_buffer, bool _force=false)
Write data to the socket.
unsigned int GetRemotePort() const
Get the remote port number.
event::ConnectionPtr ConnectToShutdown(boost::function< void()> _subscriber)
Register a function to be called when the connection is shut down.
Definition: Connection.hh:329
void Shutdown()
Shutdown the socket.
void StopRead()
Stop the read loop.
bool Read(std::string &_data)
Read data from the socket.
std::string GetLocalAddress() const
Get the local address of this connection.
virtual ~Connection()
Destructor.
static bool ValidateIP(const std::string &_ip)
Return true if the _ip is a valid.
void AsyncRead(Handler _handler)
Peform an asyncronous read param[in] _handler Callback to invoke on received data.
Definition: Connection.hh:210
boost::function< void(const std::string &_data)> ReadCallback
The signature of a connection read callback.
Definition: Connection.hh:130
boost::function< void(const ConnectionPtr &)> AcceptCallback
The signature of a connection accept callback.
Definition: Connection.hh:121
Manages boost::asio IO.
Definition: IOManager.hh:36
auto weakBind(Func _func, boost::shared_ptr< T > _ptr, Args... _args) -> decltype(details::makeWeakBinder(boost::bind(_func, _ptr.get(), _args...), boost::weak_ptr< T >(_ptr)))
Definition: WeakBind.hh:114
#define gzerr
Output an error message.
Definition: Console.hh:50
bool is_stopped()
Is the transport system stopped?
boost::shared_ptr< Connection > ConnectionPtr
Definition: CommonTypes.hh:134
boost::shared_ptr< Connection > ConnectionPtr
Definition: Connection.hh:55
Forward declarations for the common classes.
Definition: Animation.hh:27
GAZEBO_VISIBLE bool shutdown()
Stop and cleanup simulation.