Libosmium  2.9.0
Fast and flexible C++ library for working with OpenStreetMap data
reader.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_READER_HPP
2 #define OSMIUM_IO_READER_HPP
3 
4 /*
5 
6 This file is part of Osmium (http://osmcode.org/libosmium).
7 
8 Copyright 2013-2016 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
36 #include <cerrno>
37 #include <cstdlib>
38 #include <fcntl.h>
39 #include <future>
40 #include <memory>
41 #include <string>
42 #include <system_error>
43 #include <thread>
44 #include <utility>
45 
46 #ifndef _WIN32
47 # include <sys/wait.h>
48 #endif
49 
50 #ifndef _MSC_VER
51 # include <unistd.h>
52 #endif
53 
55 #include <osmium/io/detail/input_format.hpp>
56 #include <osmium/io/detail/read_thread.hpp>
57 #include <osmium/io/detail/read_write.hpp>
58 #include <osmium/io/detail/queue_util.hpp>
59 #include <osmium/io/error.hpp>
60 #include <osmium/io/file.hpp>
61 #include <osmium/io/header.hpp>
62 #include <osmium/memory/buffer.hpp>
64 #include <osmium/thread/util.hpp>
65 #include <osmium/util/config.hpp>
66 
67 namespace osmium {
68 
69  namespace io {
70 
71  namespace detail {
72 
73  inline size_t get_input_queue_size() noexcept {
74  const size_t n = osmium::config::get_max_queue_size("INPUT", 20);
75  return n > 2 ? n : 2;
76  }
77 
78  inline size_t get_osmdata_queue_size() noexcept {
79  const size_t n = osmium::config::get_max_queue_size("OSMDATA", 20);
80  return n > 2 ? n : 2;
81  }
82 
83  } // namespace detail
84 
91  class Reader {
92 
95 
96  enum class status {
97  okay = 0, // normal reading
98  error = 1, // some error occurred while reading
99  closed = 2, // close() called successfully after eof
100  eof = 3 // eof of file was reached without error
101  } m_status;
102 
104 
105  detail::future_string_queue_type m_input_queue;
106 
107  std::unique_ptr<osmium::io::Decompressor> m_decompressor;
108 
109  osmium::io::detail::ReadThreadManager m_read_thread_manager;
110 
111  detail::future_buffer_queue_type m_osmdata_queue;
112  detail::queue_wrapper<osmium::memory::Buffer> m_osmdata_queue_wrapper;
113 
114  std::future<osmium::io::Header> m_header_future;
116 
118 
119  size_t m_file_size;
120 
121  // This function will run in a separate thread.
122  static void parser_thread(const osmium::io::File& file,
123  detail::future_string_queue_type& input_queue,
124  detail::future_buffer_queue_type& osmdata_queue,
125  std::promise<osmium::io::Header>&& header_promise,
126  osmium::osm_entity_bits::type read_which_entities) {
127  std::promise<osmium::io::Header> promise = std::move(header_promise);
128  const auto creator = detail::ParserFactory::instance().get_creator_function(file);
129  const auto parser = creator(input_queue, osmdata_queue, promise, read_which_entities);
130  parser->parse();
131  }
132 
133 #ifndef _WIN32
134 
145  static int execute(const std::string& command, const std::string& filename, int* childpid) {
146  int pipefd[2];
147  if (pipe(pipefd) < 0) {
148  throw std::system_error(errno, std::system_category(), "opening pipe failed");
149  }
150  const pid_t pid = fork();
151  if (pid < 0) {
152  throw std::system_error(errno, std::system_category(), "fork failed");
153  }
154  if (pid == 0) { // child
155  // close all file descriptors except one end of the pipe
156  for (int i = 0; i < 32; ++i) {
157  if (i != pipefd[1]) {
158  ::close(i);
159  }
160  }
161  if (dup2(pipefd[1], 1) < 0) { // put end of pipe as stdout/stdin
162  exit(1);
163  }
164 
165  ::open("/dev/null", O_RDONLY); // stdin
166  ::open("/dev/null", O_WRONLY); // stderr
167  // hack: -g switches off globbing in curl which allows [] to be used in file names
168  // this is important for XAPI URLs
169  // in theory this execute() function could be used for other commands, but it is
170  // only used for curl at the moment, so this is okay.
171  if (::execlp(command.c_str(), command.c_str(), "-g", filename.c_str(), nullptr) < 0) {
172  exit(1);
173  }
174  }
175  // parent
176  *childpid = pid;
177  ::close(pipefd[1]);
178  return pipefd[0];
179  }
180 #endif
181 
190  static int open_input_file_or_url(const std::string& filename, int* childpid) {
191  std::string protocol = filename.substr(0, filename.find_first_of(':'));
192  if (protocol == "http" || protocol == "https" || protocol == "ftp" || protocol == "file") {
193 #ifndef _WIN32
194  return execute("curl", filename, childpid);
195 #else
196  throw io_error("Reading OSM files from the network currently not supported on Windows.");
197 #endif
198  } else {
199  return osmium::io::detail::open_for_reading(filename);
200  }
201  }
202 
203  public:
204 
215  m_file(file.check()),
216  m_read_which_entities(read_which_entities),
217  m_status(status::okay),
218  m_childpid(0),
219  m_input_queue(detail::get_input_queue_size(), "raw_input"),
220  m_decompressor(m_file.buffer() ?
221  osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), m_file.buffer(), m_file.buffer_size()) :
222  osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), open_input_file_or_url(m_file.filename(), &m_childpid))),
223  m_read_thread_manager(*m_decompressor, m_input_queue),
224  m_osmdata_queue(detail::get_osmdata_queue_size(), "parser_results"),
225  m_osmdata_queue_wrapper(m_osmdata_queue),
226  m_header_future(),
227  m_header(),
228  m_thread(),
229  m_file_size(m_decompressor->file_size()) {
230  std::promise<osmium::io::Header> header_promise;
231  m_header_future = header_promise.get_future();
232  m_thread = osmium::thread::thread_handler{parser_thread, std::ref(m_file), std::ref(m_input_queue), std::ref(m_osmdata_queue), std::move(header_promise), read_which_entities};
233  }
234 
235  explicit Reader(const std::string& filename, osmium::osm_entity_bits::type read_types = osmium::osm_entity_bits::all) :
236  Reader(osmium::io::File(filename), read_types) {
237  }
238 
239  explicit Reader(const char* filename, osmium::osm_entity_bits::type read_types = osmium::osm_entity_bits::all) :
240  Reader(osmium::io::File(filename), read_types) {
241  }
242 
243  Reader(const Reader&) = delete;
244  Reader& operator=(const Reader&) = delete;
245 
246  Reader(Reader&&) = default;
247  Reader& operator=(Reader&&) = default;
248 
249  ~Reader() noexcept {
250  try {
251  close();
252  } catch (...) {
253  // Ignore any exceptions because destructor must not throw.
254  }
255  }
256 
265  void close() {
266  m_status = status::closed;
267 
268  m_read_thread_manager.stop();
269 
270  m_osmdata_queue_wrapper.drain();
271 
272  try {
273  m_read_thread_manager.close();
274  } catch (...) {
275  // Ignore any exceptions.
276  }
277 
278 #ifndef _WIN32
279  if (m_childpid) {
280  int status;
281  const pid_t pid = ::waitpid(m_childpid, &status, 0);
282 #pragma GCC diagnostic push
283 #pragma GCC diagnostic ignored "-Wold-style-cast"
284  if (pid < 0 || !WIFEXITED(status) || WEXITSTATUS(status) != 0) {
285  throw std::system_error(errno, std::system_category(), "subprocess returned error");
286  }
287 #pragma GCC diagnostic pop
288  m_childpid = 0;
289  }
290 #endif
291  }
292 
300  if (m_status == status::error) {
301  throw io_error("Can not get header from reader when in status 'error'");
302  }
303 
304  try {
305  if (m_header_future.valid()) {
306  m_header = m_header_future.get();
307  if (m_read_which_entities == osmium::osm_entity_bits::nothing) {
308  m_status = status::eof;
309  }
310  }
311  } catch (...) {
312  close();
313  m_status = status::error;
314  throw;
315  }
316  return m_header;
317  }
318 
330  osmium::memory::Buffer buffer;
331 
332  if (m_status != status::okay ||
333  m_read_which_entities == osmium::osm_entity_bits::nothing) {
334  throw io_error("Can not read from reader when in status 'closed', 'eof', or 'error'");
335  }
336 
337  try {
338  // m_input_format.read() can return an invalid buffer to signal EOF,
339  // or a valid buffer with or without data. A valid buffer
340  // without data is not an error, it just means we have to
341  // keep getting the next buffer until there is one with data.
342  while (true) {
343  buffer = m_osmdata_queue_wrapper.pop();
344  if (detail::at_end_of_data(buffer)) {
345  m_status = status::eof;
346  m_read_thread_manager.close();
347  return buffer;
348  }
349  if (buffer.committed() > 0) {
350  return buffer;
351  }
352  }
353  } catch (...) {
354  close();
355  m_status = status::error;
356  throw;
357  }
358  }
359 
364  bool eof() const {
365  return m_status == status::eof || m_status == status::closed;
366  }
367 
372  size_t file_size() const noexcept {
373  return m_file_size;
374  }
375 
390  size_t offset() const noexcept {
391  return m_decompressor->offset();
392  }
393 
394  }; // class Reader
395 
404  template <typename... TArgs>
407 
408  Reader reader(std::forward<TArgs>(args)...);
409  while (osmium::memory::Buffer read_buffer = reader.read()) {
410  buffer.add_buffer(read_buffer);
411  buffer.commit();
412  }
413 
414  return buffer;
415  }
416 
417  } // namespace io
418 
419 } // namespace osmium
420 
421 #endif // OSMIUM_IO_READER_HPP
detail::queue_wrapper< osmium::memory::Buffer > m_osmdata_queue_wrapper
Definition: reader.hpp:112
status
Definition: reader.hpp:96
osmium::memory::Buffer read()
Definition: reader.hpp:329
Reader(const osmium::io::File &file, osmium::osm_entity_bits::type read_which_entities=osmium::osm_entity_bits::all)
Definition: reader.hpp:214
type
Definition: entity_bits.hpp:63
size_t file_size(int fd)
Definition: file.hpp:67
bool eof() const
Definition: reader.hpp:364
int m_childpid
Definition: reader.hpp:103
std::future< osmium::io::Header > m_header_future
Definition: reader.hpp:114
std::unique_ptr< osmium::io::Decompressor > m_decompressor
Definition: reader.hpp:107
object or changeset
Definition: entity_bits.hpp:74
detail::future_string_queue_type m_input_queue
Definition: reader.hpp:105
osmium::memory::Buffer read_file(TArgs &&... args)
Definition: reader.hpp:405
osmium::io::File m_file
Definition: reader.hpp:93
Definition: file.hpp:72
Namespace for everything in the Osmium library.
Definition: assembler.hpp:73
Definition: attr.hpp:333
void add_buffer(const Buffer &buffer)
Definition: buffer.hpp:474
osmium::io::detail::ReadThreadManager m_read_thread_manager
Definition: reader.hpp:109
size_t file_size() const noexcept
Definition: reader.hpp:372
static int execute(const std::string &command, const std::string &filename, int *childpid)
Definition: reader.hpp:145
Definition: reader.hpp:91
~Reader() noexcept
Definition: reader.hpp:249
Definition: error.hpp:44
static void parser_thread(const osmium::io::File &file, detail::future_string_queue_type &input_queue, detail::future_buffer_queue_type &osmdata_queue, std::promise< osmium::io::Header > &&header_promise, osmium::osm_entity_bits::type read_which_entities)
Definition: reader.hpp:122
osmium::io::Header header()
Definition: reader.hpp:299
size_t committed() const noexcept
Definition: buffer.hpp:241
size_t get_max_queue_size(const char *queue_name, size_t default_value) noexcept
Definition: config.hpp:69
Definition: buffer.hpp:97
size_t m_file_size
Definition: reader.hpp:119
osmium::thread::thread_handler m_thread
Definition: reader.hpp:117
static int open_input_file_or_url(const std::string &filename, int *childpid)
Definition: reader.hpp:190
detail::future_buffer_queue_type m_osmdata_queue
Definition: reader.hpp:111
osmium::io::Header m_header
Definition: reader.hpp:115
Definition: entity_bits.hpp:65
Reader(const char *filename, osmium::osm_entity_bits::type read_types=osmium::osm_entity_bits::all)
Definition: reader.hpp:239
Definition: compression.hpp:138
size_t offset() const noexcept
Definition: reader.hpp:390
std::string get(const std::string &key, const std::string &default_value="") const noexcept
Definition: options.hpp:125
Definition: header.hpp:49
void close()
Definition: reader.hpp:265
osmium::osm_entity_bits::type m_read_which_entities
Definition: reader.hpp:94
size_t commit()
Definition: buffer.hpp:335
Definition: util.hpp:85
Reader(const std::string &filename, osmium::osm_entity_bits::type read_types=osmium::osm_entity_bits::all)
Definition: reader.hpp:235