@@ -21,13 +21,21 @@ VizInterface::VizInterface() {
2121 this ->opNavMode = 0 ;
2222 this ->saveFile = false ;
2323 this ->liveStream = false ;
24+ this ->broadcastStream = false ;
2425 this ->FrameNumber = -1 ;
2526
27+ this ->lastSettingsSendTime = time (0 ); // current system time in seconds
28+ this ->broadcastSettingsSendDelay = 2 ; // real-time seconds
29+
2630 this ->firstPass = 0 ;
2731
28- this ->comProtocol = " tcp" ;
29- this ->comAddress = " localhost" ;
30- this ->comPortNumber = " 5556" ;
32+ this ->reqComProtocol = " tcp" ;
33+ this ->reqComAddress = " localhost" ;
34+ this ->reqPortNumber = " 5556" ;
35+
36+ this ->pubComProtocol = " tcp" ;
37+ this ->pubComAddress = " localhost" ;
38+ this ->pubPortNumber = " 5570" ;
3139
3240 return ;
3341}
@@ -46,22 +54,51 @@ VizInterface::~VizInterface() {
4654 @param currentSimNanos The current sim time in nanoseconds
4755 */
4856void VizInterface::reset (uint64_t currentSimNanos) {
57+ if (this ->broadcastStream ) {
58+ // Setup ZMQ for broadcast socket
59+ this ->publisher_context = zmq_ctx_new ();
60+ this ->publisher_socket = zmq_socket (this ->publisher_context , ZMQ_PUB);
61+ assert (this ->publisher_socket );
62+ // Build address
63+ std::string text = this ->pubComProtocol + " ://" + this ->pubComAddress + " :" + this ->pubPortNumber ;
64+ int broadcastConnect = zmq_bind (this ->publisher_socket , text.c_str ());
65+ // Error if bind failure
66+ if (broadcastConnect != 0 ) {
67+ int error_code = zmq_errno ();
68+ text = " Broadcast socket did not connect correctly. ZMQ error code: " + std::to_string (error_code);
69+ bskLogger.bskLog (BSK_ERROR, text.c_str ());
70+ return ;
71+ }
72+ text = " Broadcasting at " + this ->pubComProtocol + " ://" + this ->pubComAddress + " :" + this ->pubPortNumber ;
73+ bskLogger.bskLog (BSK_INFORMATION, text.c_str ());
74+ }
75+
4976 if (this ->opNavMode > 0 || this ->liveStream ) {
50- /* setup zeroMQ */
77+ // Reset cameras
5178 for (size_t camCounter = 0 ; camCounter < this ->cameraConfInMsgs .size (); camCounter++) {
5279 this ->bskImagePtrs [camCounter] = NULL ;
5380 }
54- this ->context = zmq_ctx_new ();
55- this ->requester_socket = zmq_socket (this ->context , ZMQ_REQ);
56- zmq_connect (this ->requester_socket ,
57- (this ->comProtocol + " ://" + this ->comAddress + " :" + this ->comPortNumber ).c_str ());
81+ // Setup ZMQ for 2-way socket
82+ this ->requester_context = zmq_ctx_new ();
83+ this ->requester_socket = zmq_socket (this ->requester_context , ZMQ_REQ);
84+ assert (this ->requester_socket );
85+ // Build address
86+ std::string text = this ->reqComProtocol + " ://" + this ->reqComAddress + " :" + this ->reqPortNumber ;
87+ int twoWayConnect = zmq_connect (this ->requester_socket , text.c_str ());
88+ // Error if connection failure
89+ if (twoWayConnect != 0 ) {
90+ int error_code = zmq_errno ();
91+ text = " 2-way socket did not connect correctly. ZMQ error code: " + std::to_string (error_code);
92+ bskLogger.bskLog (BSK_ERROR, text.c_str ());
93+ return ;
94+ }
5895
5996 void * message = malloc (4 * sizeof (char ));
6097 memcpy (message, " PING" , 4 );
6198 zmq_msg_t request;
6299
63- std::string text;
64- text = " Waiting for Vizard at " + this ->comProtocol + " ://" + this ->comAddress + " :" + this ->comPortNumber ;
100+ text =
101+ " Waiting for Vizard at " + this ->reqComProtocol + " ://" + this ->reqComAddress + " :" + this ->reqPortNumber ;
65102 bskLogger.bskLog (BSK_INFORMATION, text.c_str ());
66103
67104 zmq_msg_init_data (&request, message, 4 , message_buffer_deallocate, NULL );
@@ -424,8 +461,10 @@ void VizInterface::ReadBSKMessages() {
424461void VizInterface::WriteProtobuffer (uint64_t currentSimNanos) {
425462 vizProtobufferMessage::VizMessage* message = new vizProtobufferMessage::VizMessage;
426463
427- /* ! Send the Vizard settings once */
428- if (this ->settings .dataFresh ) {
464+ /* ! Send the Vizard settings according to interval set by broadcastSettingsSendDelay field */
465+ this ->now = time (0 );
466+ if (this ->settings .dataFresh ||
467+ (this ->broadcastStream && (this ->now - this ->lastSettingsSendTime ) >= this ->broadcastSettingsSendDelay )) {
429468 vizProtobufferMessage::VizMessage::VizSettingsPb* vizSettings;
430469 vizSettings = new vizProtobufferMessage::VizMessage::VizSettingsPb;
431470
@@ -721,6 +760,7 @@ void VizInterface::WriteProtobuffer(uint64_t currentSimNanos) {
721760 message->set_allocated_settings (vizSettings);
722761
723762 this ->settings .dataFresh = false ;
763+ this ->lastSettingsSendTime = now;
724764 }
725765
726766 /* ! Send the Vizard live settings */
@@ -1044,13 +1084,40 @@ void VizInterface::WriteProtobuffer(uint64_t currentSimNanos) {
10441084 }
10451085
10461086 {
1047- google::protobuf::uint8 varIntBuffer[ 4 ];
1087+ // Serialize message (as is)
10481088 uint32_t byteCount = (uint32_t )message->ByteSizeLong ();
1049- google::protobuf::uint8* end =
1050- google::protobuf::io::CodedOutputStream::WriteVarint32ToArray (byteCount, varIntBuffer);
1051- unsigned long varIntBytes = (unsigned long )(end - varIntBuffer);
1052- if (this ->saveFile ) {
1053- this ->outputStream ->write (reinterpret_cast <char *>(varIntBuffer), (int )varIntBytes);
1089+ void * serialized_message = malloc (byteCount);
1090+ message->SerializeToArray (serialized_message, (int )byteCount);
1091+
1092+ // BROADCAST MODE
1093+ if (this ->broadcastStream ) {
1094+ // Send serialized message to BROADCAST (PUBLISHER) socket */
1095+ int sendStatus = zmq_send (this ->publisher_socket , " SIM_UPDATE" , 10 , ZMQ_SNDMORE);
1096+ if (sendStatus == -1 ) {
1097+ bskLogger.bskLog (BSK_ERROR, " Broadcast header did not send to socket." );
1098+ }
1099+ sendStatus = zmq_send (this ->publisher_socket , serialized_message, byteCount, 0 );
1100+ if (sendStatus == -1 ) {
1101+ bskLogger.bskLog (BSK_ERROR, " Broadcast protobuffer did not send to socket." );
1102+ }
1103+ }
1104+
1105+ /* If settings were broadcast, remove from message before saving & sending to 2-way socket to reduce message
1106+ size. Message must be re-serialized here as its contents have changed. */
1107+ if (this ->liveStream && (this ->firstPass != 0 ) && (this ->lastSettingsSendTime == this ->now )) {
1108+ // Zero-out settings to reduce message size if not at first timestep
1109+ message->set_allocated_settings (nullptr );
1110+ // Re-serialize
1111+ google::protobuf::uint8 varIntBuffer[4 ];
1112+ byteCount = (uint32_t )message->ByteSizeLong ();
1113+ google::protobuf::uint8* end =
1114+ google::protobuf::io::CodedOutputStream::WriteVarint32ToArray (byteCount, varIntBuffer);
1115+ unsigned long varIntBytes = (unsigned long )(end - varIntBuffer);
1116+ if (this ->saveFile ) {
1117+ this ->outputStream ->write (reinterpret_cast <char *>(varIntBuffer), (int )varIntBytes);
1118+ }
1119+ serialized_message = malloc (byteCount);
1120+ message->SerializeToArray (serialized_message, (int )byteCount);
10541121 }
10551122
10561123 /* ! Enter in lock-step with the vizard to simulate a camera */
@@ -1078,10 +1145,7 @@ void VizInterface::WriteProtobuffer(uint64_t currentSimNanos) {
10781145 zmq_msg_t receive_buffer;
10791146 zmq_msg_init (&receive_buffer);
10801147 zmq_msg_recv (&receive_buffer, requester_socket, 0 );
1081-
1082- /* ! - send protobuffer raw over zmq_socket */
1083- void * serialized_message = malloc (byteCount);
1084- message->SerializeToArray (serialized_message, (int )byteCount);
1148+ zmq_msg_close (&receive_buffer);
10851149
10861150 /* ! - Normal sim step by sending protobuffers */
10871151 zmq_msg_t request_header;
0 commit comments