ACloudViewer  3.9.4
A Modern Library for 3D Data Processing
ZMQReceiver.cpp
Go to the documentation of this file.
1 // ----------------------------------------------------------------------------
2 // - CloudViewer: www.cloudViewer.org -
3 // ----------------------------------------------------------------------------
4 // Copyright (c) 2018-2024 www.cloudViewer.org
5 // SPDX-License-Identifier: MIT
6 // ----------------------------------------------------------------------------
7 
9 #include <io/rpc/Messages.h>
10 #include <io/rpc/ZMQContext.h>
11 #include <io/rpc/ZMQReceiver.h>
12 
13 #include <zmq.hpp>
14 
15 using namespace cloudViewer::utility;
16 
17 namespace {
18 std::shared_ptr<zmq::message_t> CreateStatusMessage(
20  msgpack::sbuffer sbuf;
22  msgpack::pack(sbuf, reply);
23  msgpack::pack(sbuf, status);
24  std::shared_ptr<zmq::message_t> msg =
25  std::make_shared<zmq::message_t>(sbuf.data(), sbuf.size());
26 
27  return msg;
28 }
29 } // namespace
30 
31 namespace cloudViewer {
32 namespace io {
33 namespace rpc {
34 
35 ZMQReceiver::ZMQReceiver(const std::string& address, int timeout)
36  : address_(address),
37  timeout_(timeout),
38  keep_running_(false),
39  loop_running_(false),
40  mainloop_error_code_(0),
41  mainloop_exception_("") {}
42 
44 
46  {
47  const std::lock_guard<std::mutex> lock(mutex_);
48  if (!keep_running_) {
49  keep_running_ = true;
50  thread_ = std::thread(&ZMQReceiver::Mainloop, this);
51  // wait for the loop to start running
52  while (!loop_running_.load() && !mainloop_error_code_.load()) {
53  std::this_thread::yield();
54  };
55 
56  if (!mainloop_error_code_.load()) {
57  LogDebug("ZMQReceiver: started");
58  }
59  } else {
60  LogDebug("ZMQReceiver: already running");
61  }
62  }
63 
64  if (mainloop_error_code_.load()) {
65  LogError(GetLastError().what());
66  }
67 }
68 
70  bool keep_running_old;
71  {
72  const std::lock_guard<std::mutex> lock(mutex_);
73  keep_running_old = keep_running_;
74  if (keep_running_old) {
75  keep_running_ = false;
76  }
77  }
78  if (keep_running_old) {
79  thread_.join();
80  LogDebug("ZMQReceiver: stopped");
81  } else {
82  LogDebug("ZMQReceiver: already stopped");
83  }
84 }
85 
86 std::runtime_error ZMQReceiver::GetLastError() {
87  const std::lock_guard<std::mutex> lock(mutex_);
88  mainloop_error_code_.store(0);
89  std::runtime_error result = mainloop_exception_;
90  mainloop_exception_ = std::runtime_error("");
91  return result;
92 }
93 
94 void ZMQReceiver::Mainloop() {
95  context_ = GetZMQContext();
96  socket_ = std::unique_ptr<zmq::socket_t>(
97  new zmq::socket_t(*context_, ZMQ_REP));
98 
99  socket_->set(zmq::sockopt::linger, 0);
100  socket_->set(zmq::sockopt::rcvtimeo, 1000);
101  socket_->set(zmq::sockopt::sndtimeo, timeout_);
102 
103  auto limits = msgpack::unpack_limit(0xffffffff, // array
104  0xffffffff, // map
105  65536, // str
106  0xffffffff, // bin
107  0xffffffff, // ext
108  100 // depth
109  );
110  try {
111  socket_->bind(address_.c_str());
112  } catch (const zmq::error_t& err) {
113  mainloop_exception_ = std::runtime_error(
114  "ZMQReceiver::Mainloop: Failed to bind address, " +
115  std::string(err.what()));
116  mainloop_error_code_.store(1);
117  return;
118  }
119 
120  loop_running_.store(true);
121  while (true) {
122  {
123  const std::lock_guard<std::mutex> lock(mutex_);
124  if (!keep_running_) break;
125  }
126  try {
127  zmq::message_t message;
128  if (!socket_->recv(message)) {
129  continue;
130  }
131 
132  const char* buffer = (char*)message.data();
133  size_t buffer_size = message.size();
134 
135  std::vector<std::shared_ptr<zmq::message_t>> replies;
136 
137  size_t offset = 0;
138  while (offset < buffer_size) {
139  messages::Request req;
140  try {
141  auto obj_handle =
142  msgpack::unpack(buffer, buffer_size, offset,
143  nullptr, nullptr, limits);
144  auto obj = obj_handle.get();
145  req = obj.as<messages::Request>();
146 
147  if (!processor_) {
148  LogError(
149  "ZMQReceiver::Mainloop: message processor is "
150  "null!");
151  }
152 #define PROCESS_MESSAGE(MSGTYPE) \
153  else if (MSGTYPE::MsgId() == req.msg_id) { \
154  auto oh = msgpack::unpack(buffer, buffer_size, offset, nullptr, \
155  nullptr, limits); \
156  auto obj = oh.get(); \
157  MSGTYPE msg; \
158  msg = obj.as<MSGTYPE>(); \
159  auto reply = processor_->ProcessMessage(req, msg, oh); \
160  if (reply) { \
161  replies.push_back(reply); \
162  } else { \
163  replies.push_back(CreateStatusMessage( \
164  messages::Status::ErrorProcessingMessage())); \
165  } \
166  }
168  PROCESS_MESSAGE(messages::GetMeshData)
169  PROCESS_MESSAGE(messages::SetCameraData)
170  PROCESS_MESSAGE(messages::SetProperties)
173  else {
174  LogInfo("ZMQReceiver::Mainloop: unsupported msg "
175  "id '{}'",
176  req.msg_id);
178  replies.push_back(CreateStatusMessage(status));
179  break;
180  }
181  } catch (std::exception& err) {
182  LogInfo("ZMQReceiver::Mainloop:a {}", err.what());
184  status.str += std::string(" with ") + err.what();
185  replies.push_back(CreateStatusMessage(status));
186  break;
187  }
188  }
189  if (replies.size() == 1) {
190  socket_->send(*replies[0], zmq::send_flags::none);
191  } else {
192  size_t size = 0;
193  for (auto r : replies) {
194  size += r->size();
195  }
196  zmq::message_t reply(size);
197  size_t offset = 0;
198  for (auto r : replies) {
199  memcpy((char*)reply.data() + offset, r->data(), r->size());
200  offset += r->size();
201  }
202  socket_->send(reply, zmq::send_flags::none);
203  }
204  } catch (const zmq::error_t& err) {
205  LogInfo("ZMQReceiver::Mainloop: {}", err.what());
206  }
207  }
208  socket_->close();
209  loop_running_.store(false);
210 }
211 
213  std::shared_ptr<MessageProcessorBase> processor) {
214  processor_ = processor;
215 }
216 
217 } // namespace rpc
218 } // namespace io
219 } // namespace cloudViewer
int size
int offset
core::Tensor result
Definition: VtkUtils.cpp:76
#define PROCESS_MESSAGE(MSGTYPE)
std::runtime_error GetLastError()
Returns the last error from the mainloop thread.
Definition: ZMQReceiver.cpp:86
void SetMessageProcessor(std::shared_ptr< MessageProcessorBase > processor)
Sets the message processor object which will process incoming messages.
void Start()
Starts the receiver mainloop in a new thread.
Definition: ZMQReceiver.cpp:45
#define LogInfo(...)
Definition: Logging.h:81
#define LogError(...)
Definition: Logging.h:60
#define LogDebug(...)
Definition: Logging.h:90
bool SetActiveCamera(const std::string &path, std::shared_ptr< ConnectionBase > connection)
std::shared_ptr< zmq::context_t > GetZMQContext()
Returns the zeromq context for this process.
Definition: ZMQContext.cpp:22
bool SetMeshData(const std::string &path, int time, const std::string &layer, const core::Tensor &vertices, const std::map< std::string, core::Tensor > &vertex_attributes, const core::Tensor &faces, const std::map< std::string, core::Tensor > &face_attributes, const core::Tensor &lines, const std::map< std::string, core::Tensor > &line_attributes, const std::string &material, const std::map< std::string, float > &material_scalar_attributes, const std::map< std::string, std::array< float, 4 >> &material_vector_attributes, const std::map< std::string, t::geometry::Image > &texture_maps, const std::string &o3d_type, std::shared_ptr< ConnectionBase > connection)
bool SetTime(int time, std::shared_ptr< ConnectionBase > connection)
Generic file read and write utility for python interface.