diff --git a/src/simulation/vizard/vizInterface/vizInterface.cpp b/src/simulation/vizard/vizInterface/vizInterface.cpp index 68432f3bb..b33516ad3 100644 --- a/src/simulation/vizard/vizInterface/vizInterface.cpp +++ b/src/simulation/vizard/vizInterface/vizInterface.cpp @@ -21,13 +21,21 @@ VizInterface::VizInterface() { this->opNavMode = 0; this->saveFile = false; this->liveStream = false; + this->broadcastStream = false; this->FrameNumber = -1; + this->lastSettingsSendTime = time(0); // current system time in seconds + this->broadcastSettingsSendDelay = 2; // real-time seconds + this->firstPass = 0; - this->comProtocol = "tcp"; - this->comAddress = "localhost"; - this->comPortNumber = "5556"; + this->reqComProtocol = "tcp"; + this->reqComAddress = "localhost"; + this->reqPortNumber = "5556"; + + this->pubComProtocol = "tcp"; + this->pubComAddress = "localhost"; + this->pubPortNumber = "5570"; return; } @@ -46,22 +54,51 @@ VizInterface::~VizInterface() { @param currentSimNanos The current sim time in nanoseconds */ void VizInterface::reset(uint64_t currentSimNanos) { + if (this->broadcastStream) { + // Setup ZMQ for broadcast socket + this->publisher_context = zmq_ctx_new(); + this->publisher_socket = zmq_socket(this->publisher_context, ZMQ_PUB); + assert(this->publisher_socket); + // Build address + std::string text = this->pubComProtocol + "://" + this->pubComAddress + ":" + this->pubPortNumber; + int broadcastConnect = zmq_bind(this->publisher_socket, text.c_str()); + // Error if bind failure + if (broadcastConnect != 0) { + int error_code = zmq_errno(); + text = "Broadcast socket did not connect correctly. ZMQ error code: " + std::to_string(error_code); + bskLogger.bskLog(BSK_ERROR, text.c_str()); + return; + } + text = "Broadcasting at " + this->pubComProtocol + "://" + this->pubComAddress + ":" + this->pubPortNumber; + bskLogger.bskLog(BSK_INFORMATION, text.c_str()); + } + if (this->opNavMode > 0 || this->liveStream) { - /* setup zeroMQ */ + // Reset cameras for (size_t camCounter = 0; camCounter < this->cameraConfInMsgs.size(); camCounter++) { this->bskImagePtrs[camCounter] = NULL; } - this->context = zmq_ctx_new(); - this->requester_socket = zmq_socket(this->context, ZMQ_REQ); - zmq_connect(this->requester_socket, - (this->comProtocol + "://" + this->comAddress + ":" + this->comPortNumber).c_str()); + // Setup ZMQ for 2-way socket + this->requester_context = zmq_ctx_new(); + this->requester_socket = zmq_socket(this->requester_context, ZMQ_REQ); + assert(this->requester_socket); + // Build address + std::string text = this->reqComProtocol + "://" + this->reqComAddress + ":" + this->reqPortNumber; + int twoWayConnect = zmq_connect(this->requester_socket, text.c_str()); + // Error if connection failure + if (twoWayConnect != 0) { + int error_code = zmq_errno(); + text = "2-way socket did not connect correctly. ZMQ error code: " + std::to_string(error_code); + bskLogger.bskLog(BSK_ERROR, text.c_str()); + return; + } void* message = malloc(4 * sizeof(char)); memcpy(message, "PING", 4); zmq_msg_t request; - std::string text; - text = "Waiting for Vizard at " + this->comProtocol + "://" + this->comAddress + ":" + this->comPortNumber; + text = + "Waiting for Vizard at " + this->reqComProtocol + "://" + this->reqComAddress + ":" + this->reqPortNumber; bskLogger.bskLog(BSK_INFORMATION, text.c_str()); zmq_msg_init_data(&request, message, 4, message_buffer_deallocate, NULL); @@ -424,8 +461,10 @@ void VizInterface::ReadBSKMessages() { void VizInterface::WriteProtobuffer(uint64_t currentSimNanos) { vizProtobufferMessage::VizMessage* message = new vizProtobufferMessage::VizMessage; - /*! Send the Vizard settings once */ - if (this->settings.dataFresh) { + /*! Send the Vizard settings according to interval set by broadcastSettingsSendDelay field */ + this->now = time(0); + if (this->settings.dataFresh || + (this->broadcastStream && (this->now - this->lastSettingsSendTime) >= this->broadcastSettingsSendDelay)) { vizProtobufferMessage::VizMessage::VizSettingsPb* vizSettings; vizSettings = new vizProtobufferMessage::VizMessage::VizSettingsPb; @@ -721,6 +760,7 @@ void VizInterface::WriteProtobuffer(uint64_t currentSimNanos) { message->set_allocated_settings(vizSettings); this->settings.dataFresh = false; + this->lastSettingsSendTime = now; } /*! Send the Vizard live settings */ @@ -1044,13 +1084,40 @@ void VizInterface::WriteProtobuffer(uint64_t currentSimNanos) { } { - google::protobuf::uint8 varIntBuffer[4]; + // Serialize message (as is) uint32_t byteCount = (uint32_t)message->ByteSizeLong(); - google::protobuf::uint8* end = - google::protobuf::io::CodedOutputStream::WriteVarint32ToArray(byteCount, varIntBuffer); - unsigned long varIntBytes = (unsigned long)(end - varIntBuffer); - if (this->saveFile) { - this->outputStream->write(reinterpret_cast(varIntBuffer), (int)varIntBytes); + void* serialized_message = malloc(byteCount); + message->SerializeToArray(serialized_message, (int)byteCount); + + // BROADCAST MODE + if (this->broadcastStream) { + // Send serialized message to BROADCAST (PUBLISHER) socket */ + int sendStatus = zmq_send(this->publisher_socket, "SIM_UPDATE", 10, ZMQ_SNDMORE); + if (sendStatus == -1) { + bskLogger.bskLog(BSK_ERROR, "Broadcast header did not send to socket."); + } + sendStatus = zmq_send(this->publisher_socket, serialized_message, byteCount, 0); + if (sendStatus == -1) { + bskLogger.bskLog(BSK_ERROR, "Broadcast protobuffer did not send to socket."); + } + } + + /* If settings were broadcast, remove from message before saving & sending to 2-way socket to reduce message + size. Message must be re-serialized here as its contents have changed. */ + if (this->liveStream && (this->firstPass != 0) && (this->lastSettingsSendTime == this->now)) { + // Zero-out settings to reduce message size if not at first timestep + message->set_allocated_settings(nullptr); + // Re-serialize + google::protobuf::uint8 varIntBuffer[4]; + byteCount = (uint32_t)message->ByteSizeLong(); + google::protobuf::uint8* end = + google::protobuf::io::CodedOutputStream::WriteVarint32ToArray(byteCount, varIntBuffer); + unsigned long varIntBytes = (unsigned long)(end - varIntBuffer); + if (this->saveFile) { + this->outputStream->write(reinterpret_cast(varIntBuffer), (int)varIntBytes); + } + serialized_message = malloc(byteCount); + message->SerializeToArray(serialized_message, (int)byteCount); } /*! Enter in lock-step with the vizard to simulate a camera */ @@ -1078,10 +1145,7 @@ void VizInterface::WriteProtobuffer(uint64_t currentSimNanos) { zmq_msg_t receive_buffer; zmq_msg_init(&receive_buffer); zmq_msg_recv(&receive_buffer, requester_socket, 0); - - /*! - send protobuffer raw over zmq_socket */ - void* serialized_message = malloc(byteCount); - message->SerializeToArray(serialized_message, (int)byteCount); + zmq_msg_close(&receive_buffer); /*! - Normal sim step by sending protobuffers */ zmq_msg_t request_header; diff --git a/src/simulation/vizard/vizInterface/vizInterface.h b/src/simulation/vizard/vizInterface/vizInterface.h index 793158993..a3e378e32 100644 --- a/src/simulation/vizard/vizInterface/vizInterface.h +++ b/src/simulation/vizard/vizInterface/vizInterface.h @@ -7,6 +7,7 @@ #include "vizMessage.pb.h" #include +#include #include #include #include @@ -45,11 +46,13 @@ class VizInterface : public SysModel { std::vector locations; //!< [] vector of ground or spacecraft locations std::vector gravBodyInformation; //!< [-] vector of gravitational body info std::vector*> - opnavImageOutMsgs; //!< vector of vizard instrument camera output messages - int opNavMode; /*!< [int] Set non-zero positive value if Unity/Viz couple in direct - communication. (1 - regular opNav, 2 - performance opNav) */ - bool saveFile; //!< [Bool] Set True if Vizard should save a file of the data. - bool liveStream; //!< [Bool] Set True if Vizard should receive a live stream of BSK data. + opnavImageOutMsgs; //!< vector of vizard instrument camera output messages + int opNavMode; /*!< [int] Set non-zero positive value if Unity/Viz couple in direct + communication. (1 - regular opNav, 2 - performance opNav) */ + bool saveFile; //!< [Bool] Set True if Vizard should save a file of the data. + + bool liveStream; //!< [Bool] Set True if Vizard should receive a live stream of BSK data. + bool broadcastStream; //!< [Bool] Set True if messages should be broadcast for listener Vizards to pick up. std::vector bskImagePtrs; /*!< [RUN] vector of permanent pointers for the images to be used in BSK without relying on ZMQ because ZMQ will free it (whenever, who knows) */ @@ -62,9 +65,13 @@ class VizInterface : public SysModel { VizSettings settings; //!< [-] container for the Viz settings that can be specified from BSK LiveVizSettings liveSettings; //!< [-] container for Viz settings that are updated on each time step - std::string comProtocol; //!< Communication protocol to use when connecting to Vizard - std::string comAddress; //!< Communication address to use when connecting to Vizard - std::string comPortNumber; //!< Communication port number to use when connecting to Vizard + std::string reqComProtocol; //!< Communication protocol to use when connecting to 2-way Vizard + std::string reqComAddress; //!< Communication address to use when connecting to 2-way Vizard + std::string reqPortNumber; //!< Communication port number to use when connecting to 2-way Vizard + std::string pubComProtocol; //!< Communication protocol to use when connecting to broadcast Vizard + std::string pubComAddress; //!< Communication address to use when connecting to broadcast Vizard + std::string pubPortNumber; //!< Communication port number to use when connecting to broadcast Vizard + double broadcastSettingsSendDelay; //!< Real-time delay between sending Viz settings to broadcast socket ReadFunctor epochInMsg; //!< [-] simulation epoch date/time input msg MsgCurrStatus epochMsgStatus; //!< [-] ID of the epoch msg @@ -74,13 +81,18 @@ class VizInterface : public SysModel { private: // ZeroMQ State - void* context; - void* requester_socket; - int firstPass; //!< Flag to intialize the viz at first timestep + void* requester_context; //!< 2-way context pointer (container for socket management) + void* requester_socket; //!< 2-way socket pointer (ZMQ_REQ) + void* publisher_context; //!< Broadcast context pointer (container for socket management) + void* publisher_socket; //!< Broadcast socket pointer (ZMQ_PUB) + int firstPass; //!< Flag to intialize the viz at first timestep std::vector spiceInMsgStatus; //!< [-] status of the incoming planets' spice data messages std::vector spiceMessage; //!< [-] Spice message copies std::ofstream* outputStream; //!< [-] Output file stream opened in reset + int64_t now; //!< Current system time stamp + int64_t lastSettingsSendTime; //!< System time stamp when settings message was last sent to broadcast socket + void requestImage(size_t camCounter, uint64_t currentSimNanos); //!< request image from Vizard and store it in output img msg }; diff --git a/src/utilities/vizSupport.py b/src/utilities/vizSupport.py index f0bebd0d3..7a2985b37 100644 --- a/src/utilities/vizSupport.py +++ b/src/utilities/vizSupport.py @@ -1101,6 +1101,8 @@ def enableUnityVisualization(scSim, simTaskName, scList, **kwargs): flag if opNaveMode should be used liveStream: bool flag if live data streaming to Vizard should be used + broadcastStream: bool + flag if messages should be broadcast for listener Vizards to pick up. genericStorageList: list of lists of ``GenericStorage`` structures. The outer list length must match ``scList``. lightList: @@ -1140,7 +1142,7 @@ def enableUnityVisualization(scSim, simTaskName, scList, **kwargs): global firstSpacecraftName unitTestSupport.checkMethodKeyword( - ['saveFile', 'opNavMode', 'rwEffectorList', 'thrEffectorList', 'thrColors', 'liveStream', 'cssList', + ['saveFile', 'opNavMode', 'rwEffectorList', 'thrEffectorList', 'thrColors', 'cssList', 'liveStream', 'broadcastStream', 'genericSensorList', 'transceiverList', 'genericStorageList', 'lightList', 'spriteList', 'modelDictionaryKeyList', 'oscOrbitColorList', 'trueOrbitColorList', 'logoTextureList', 'msmInfoList', 'ellipsoidList', 'trueOrbitColorInMsgList'], @@ -1531,6 +1533,13 @@ def enableUnityVisualization(scSim, simTaskName, scList, **kwargs): exit(1) vizMessenger.opNavMode = 0 + if 'broadcastStream' in kwargs: + val = kwargs['broadcastStream'] + if not isinstance(val, bool): + print('ERROR: vizSupport: broadcastStream must be True or False') + exit(1) + vizMessenger.broadcastStream = val + if 'opNavMode' in kwargs: val = kwargs['opNavMode'] if not isinstance(val, int):