18 std::shared_ptr<zmq::message_t> CreateStatusMessage(
20 msgpack::sbuffer sbuf;
22 msgpack::pack(sbuf, reply);
23 msgpack::pack(sbuf, status);
24 std::shared_ptr<zmq::message_t> msg =
25 std::make_shared<zmq::message_t>(sbuf.data(), sbuf.size());
35 ZMQReceiver::ZMQReceiver(
const std::string& address,
int timeout)
40 mainloop_error_code_(0),
41 mainloop_exception_(
"") {}
47 const std::lock_guard<std::mutex> lock(mutex_);
50 thread_ = std::thread(&ZMQReceiver::Mainloop,
this);
52 while (!loop_running_.load() && !mainloop_error_code_.load()) {
53 std::this_thread::yield();
56 if (!mainloop_error_code_.load()) {
60 LogDebug(
"ZMQReceiver: already running");
64 if (mainloop_error_code_.load()) {
70 bool keep_running_old;
72 const std::lock_guard<std::mutex> lock(mutex_);
73 keep_running_old = keep_running_;
74 if (keep_running_old) {
75 keep_running_ =
false;
78 if (keep_running_old) {
82 LogDebug(
"ZMQReceiver: already stopped");
87 const std::lock_guard<std::mutex> lock(mutex_);
88 mainloop_error_code_.store(0);
89 std::runtime_error
result = mainloop_exception_;
90 mainloop_exception_ = std::runtime_error(
"");
94 void ZMQReceiver::Mainloop() {
96 socket_ = std::unique_ptr<zmq::socket_t>(
97 new zmq::socket_t(*context_, ZMQ_REP));
99 socket_->set(zmq::sockopt::linger, 0);
100 socket_->set(zmq::sockopt::rcvtimeo, 1000);
101 socket_->set(zmq::sockopt::sndtimeo, timeout_);
103 auto limits = msgpack::unpack_limit(0xffffffff,
111 socket_->bind(address_.c_str());
112 }
catch (
const zmq::error_t& err) {
113 mainloop_exception_ = std::runtime_error(
114 "ZMQReceiver::Mainloop: Failed to bind address, " +
115 std::string(err.what()));
116 mainloop_error_code_.store(1);
120 loop_running_.store(
true);
123 const std::lock_guard<std::mutex> lock(mutex_);
124 if (!keep_running_)
break;
127 zmq::message_t message;
128 if (!socket_->recv(message)) {
132 const char* buffer = (
char*)message.data();
133 size_t buffer_size = message.size();
135 std::vector<std::shared_ptr<zmq::message_t>> replies;
138 while (
offset < buffer_size) {
139 messages::Request req;
142 msgpack::unpack(buffer, buffer_size,
offset,
143 nullptr,
nullptr, limits);
144 auto obj = obj_handle.get();
145 req = obj.as<messages::Request>();
149 "ZMQReceiver::Mainloop: message processor is "
152 #define PROCESS_MESSAGE(MSGTYPE) \
153 else if (MSGTYPE::MsgId() == req.msg_id) { \
154 auto oh = msgpack::unpack(buffer, buffer_size, offset, nullptr, \
156 auto obj = oh.get(); \
158 msg = obj.as<MSGTYPE>(); \
159 auto reply = processor_->ProcessMessage(req, msg, oh); \
161 replies.push_back(reply); \
163 replies.push_back(CreateStatusMessage( \
164 messages::Status::ErrorProcessingMessage())); \
174 LogInfo(
"ZMQReceiver::Mainloop: unsupported msg "
178 replies.push_back(CreateStatusMessage(status));
181 }
catch (std::exception& err) {
182 LogInfo(
"ZMQReceiver::Mainloop:a {}", err.what());
184 status.str += std::string(
" with ") + err.what();
185 replies.push_back(CreateStatusMessage(status));
189 if (replies.size() == 1) {
190 socket_->send(*replies[0], zmq::send_flags::none);
193 for (
auto r : replies) {
196 zmq::message_t reply(
size);
198 for (
auto r : replies) {
199 memcpy((
char*)reply.data() +
offset, r->data(), r->size());
202 socket_->send(reply, zmq::send_flags::none);
204 }
catch (
const zmq::error_t& err) {
205 LogInfo(
"ZMQReceiver::Mainloop: {}", err.what());
209 loop_running_.store(
false);
213 std::shared_ptr<MessageProcessorBase> processor) {
214 processor_ = processor;
#define PROCESS_MESSAGE(MSGTYPE)
std::runtime_error GetLastError()
Returns the last error from the mainloop thread.
void SetMessageProcessor(std::shared_ptr< MessageProcessorBase > processor)
Sets the message processor object which will process incoming messages.
void Start()
Starts the receiver mainloop in a new thread.
bool SetActiveCamera(const std::string &path, std::shared_ptr< ConnectionBase > connection)
std::shared_ptr< zmq::context_t > GetZMQContext()
Returns the zeromq context for this process.
bool SetMeshData(const std::string &path, int time, const std::string &layer, const core::Tensor &vertices, const std::map< std::string, core::Tensor > &vertex_attributes, const core::Tensor &faces, const std::map< std::string, core::Tensor > &face_attributes, const core::Tensor &lines, const std::map< std::string, core::Tensor > &line_attributes, const std::string &material, const std::map< std::string, float > &material_scalar_attributes, const std::map< std::string, std::array< float, 4 >> &material_vector_attributes, const std::map< std::string, t::geometry::Image > &texture_maps, const std::string &o3d_type, std::shared_ptr< ConnectionBase > connection)
bool SetTime(int time, std::shared_ptr< ConnectionBase > connection)
Generic file read and write utility for python interface.
static Status ErrorUnpackingFailed()
static std::string MsgId()
static Status ErrorUnsupportedMsgId()