ACloudViewer  3.9.4
A Modern Library for 3D Data Processing
server.h
Go to the documentation of this file.
1 /***********************************************************************
2  * Software License Agreement (BSD License)
3  *
4  * Copyright 2008-2011 Marius Muja (mariusm@cs.ubc.ca). All rights reserved.
5  * Copyright 2008-2011 David G. Lowe (lowe@cs.ubc.ca). All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  * notice, this list of conditions and the following disclaimer in the
15  * documentation and/or other materials provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  *************************************************************************/
28 
29 
30 #ifndef MPI_SERVER_H_
31 #define MPI_SERVER_H_
32 
33 #include <FLANN/mpi/index.h>
34 #include <stdio.h>
35 #include <time.h>
36 
37 #include <cstdlib>
38 #include <iostream>
39 #include <boost/bind.hpp>
40 #include <boost/shared_ptr.hpp>
41 #include <boost/asio.hpp>
42 #include <boost/thread/thread.hpp>
43 
44 #include "queries.h"
45 
46 namespace flann {
47 
48 namespace mpi {
49 
50 template<typename Distance>
51 class Server
52 {
53 
54  typedef typename Distance::ElementType ElementType;
55  typedef typename Distance::ResultType DistanceType;
56  typedef boost::shared_ptr<tcp::socket> socket_ptr;
58 
59  void session(socket_ptr sock)
60  {
61  boost::mpi::communicator world;
62  try {
64  if (world.rank()==0) {
65  read_object(*sock,req);
66  std::cout << "Received query\n";
67  }
68  // broadcast request to all MPI processes
69  boost::mpi::broadcast(world, req, 0);
70 
72  if (world.rank()==0) {
73  int rows = req.queries.rows;
74  int cols = req.nn;
75  resp.indices = flann::Matrix<int>(new int[rows*cols], rows, cols);
76  resp.dists = flann::Matrix<DistanceType>(new DistanceType[rows*cols], rows, cols);
77  }
78 
79  std::cout << "Searching in process " << world.rank() << "\n";
80  index_->knnSearch(req.queries, resp.indices, resp.dists, req.nn, flann::SearchParams(req.checks));
81 
82  if (world.rank()==0) {
83  std::cout << "Sending result\n";
84  write_object(*sock,resp);
85  }
86 
87  delete[] req.queries.ptr();
88  if (world.rank()==0) {
89  delete[] resp.indices.ptr();
90  delete[] resp.dists.ptr();
91  }
92 
93  }
94  catch (std::exception& e) {
95  std::cerr << "Exception in thread: " << e.what() << "\n";
96  }
97  }
98 
99 
100 
101 public:
102  Server(const std::string& filename, const std::string& dataset, short port, const IndexParams& params) :
103  port_(port)
104  {
105  boost::mpi::communicator world;
106  if (world.rank()==0) {
107  std::cout << "Reading dataset and building index...";
108  std::flush(std::cout);
109  }
110  index_ = new FlannIndex(filename, dataset, params);
111  index_->buildIndex();
112  world.barrier(); // wait for data to be loaded and indexes to be created
113  if (world.rank()==0) {
114  std::cout << "done.\n";
115  }
116  }
117 
118 
119  void run()
120  {
121  boost::mpi::communicator world;
122  boost::shared_ptr<boost::asio::io_service> io_service;
123  boost::shared_ptr<tcp::acceptor> acceptor;
124 
125  if (world.rank()==0) {
126  io_service.reset(new boost::asio::io_service());
127  acceptor.reset(new tcp::acceptor(*io_service, tcp::endpoint(tcp::v4(), port_)));
128  std::cout << "Start listening for queries...\n";
129  }
130  for (;;) {
131  socket_ptr sock;
132  if (world.rank()==0) {
133  sock.reset(new tcp::socket(*io_service));
134  acceptor->accept(*sock);
135  std::cout << "Accepted connection\n";
136  }
137  world.barrier(); // everybody waits here for a connection
138  boost::thread t(boost::bind(&Server::session, this, sock));
139  t.join();
140  }
141 
142  }
143 
144 private:
145  FlannIndex* index_;
146  short port_;
147 };
148 
149 
150 } // namespace mpi
151 } // namespace flann
152 
153 #endif // MPI_SERVER_H_
std::string filename
cmdLineReadable * params[]
T * ptr() const
Definition: matrix.h:127
void buildIndex()
Definition: index.h:130
void knnSearch(const flann::Matrix< ElementType > &queries, flann::Matrix< int > &indices, flann::Matrix< DistanceType > &dists, int knn, const SearchParams &params)
Definition: index.h:196
Server(const std::string &filename, const std::string &dataset, short port, const IndexParams &params)
Definition: server.h:102
void write_object(tcp::socket &sock, const T &val)
Definition: queries.h:86
void read_object(tcp::socket &sock, T &val)
Definition: queries.h:72
std::map< std::string, any > IndexParams
Definition: params.h:51
flann::Matrix< T > queries
Definition: queries.h:44
flann::Matrix< int > indices
Definition: queries.h:58
flann::Matrix< T > dists
Definition: queries.h:59