Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 86 additions & 22 deletions src/simulation/vizard/vizInterface/vizInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,21 @@ VizInterface::VizInterface() {
this->opNavMode = 0;
this->saveFile = false;
this->liveStream = false;
this->broadcastStream = false;
this->FrameNumber = -1;

this->lastSettingsSendTime = time(0); // current system time in seconds
this->broadcastSettingsSendDelay = 2; // real-time seconds

this->firstPass = 0;

this->comProtocol = "tcp";
this->comAddress = "localhost";
this->comPortNumber = "5556";
this->reqComProtocol = "tcp";
this->reqComAddress = "localhost";
this->reqPortNumber = "5556";

this->pubComProtocol = "tcp";
this->pubComAddress = "localhost";
this->pubPortNumber = "5570";

return;
}
Expand All @@ -46,22 +54,51 @@ VizInterface::~VizInterface() {
@param currentSimNanos The current sim time in nanoseconds
*/
void VizInterface::reset(uint64_t currentSimNanos) {
if (this->broadcastStream) {
// Setup ZMQ for broadcast socket
this->publisher_context = zmq_ctx_new();
this->publisher_socket = zmq_socket(this->publisher_context, ZMQ_PUB);
assert(this->publisher_socket);
// Build address
std::string text = this->pubComProtocol + "://" + this->pubComAddress + ":" + this->pubPortNumber;
int broadcastConnect = zmq_bind(this->publisher_socket, text.c_str());
// Error if bind failure
if (broadcastConnect != 0) {
int error_code = zmq_errno();
text = "Broadcast socket did not connect correctly. ZMQ error code: " + std::to_string(error_code);
bskLogger.bskLog(BSK_ERROR, text.c_str());
return;
}
text = "Broadcasting at " + this->pubComProtocol + "://" + this->pubComAddress + ":" + this->pubPortNumber;
bskLogger.bskLog(BSK_INFORMATION, text.c_str());
}

if (this->opNavMode > 0 || this->liveStream) {
/* setup zeroMQ */
// Reset cameras
for (size_t camCounter = 0; camCounter < this->cameraConfInMsgs.size(); camCounter++) {
this->bskImagePtrs[camCounter] = NULL;
}
this->context = zmq_ctx_new();
this->requester_socket = zmq_socket(this->context, ZMQ_REQ);
zmq_connect(this->requester_socket,
(this->comProtocol + "://" + this->comAddress + ":" + this->comPortNumber).c_str());
// Setup ZMQ for 2-way socket
this->requester_context = zmq_ctx_new();
this->requester_socket = zmq_socket(this->requester_context, ZMQ_REQ);
assert(this->requester_socket);
// Build address
std::string text = this->reqComProtocol + "://" + this->reqComAddress + ":" + this->reqPortNumber;
int twoWayConnect = zmq_connect(this->requester_socket, text.c_str());
// Error if connection failure
if (twoWayConnect != 0) {
int error_code = zmq_errno();
text = "2-way socket did not connect correctly. ZMQ error code: " + std::to_string(error_code);
bskLogger.bskLog(BSK_ERROR, text.c_str());
return;
}

void* message = malloc(4 * sizeof(char));
memcpy(message, "PING", 4);
zmq_msg_t request;

std::string text;
text = "Waiting for Vizard at " + this->comProtocol + "://" + this->comAddress + ":" + this->comPortNumber;
text =
"Waiting for Vizard at " + this->reqComProtocol + "://" + this->reqComAddress + ":" + this->reqPortNumber;
bskLogger.bskLog(BSK_INFORMATION, text.c_str());

zmq_msg_init_data(&request, message, 4, message_buffer_deallocate, NULL);
Expand Down Expand Up @@ -424,8 +461,10 @@ void VizInterface::ReadBSKMessages() {
void VizInterface::WriteProtobuffer(uint64_t currentSimNanos) {
vizProtobufferMessage::VizMessage* message = new vizProtobufferMessage::VizMessage;

/*! Send the Vizard settings once */
if (this->settings.dataFresh) {
/*! Send the Vizard settings according to interval set by broadcastSettingsSendDelay field */
this->now = time(0);
if (this->settings.dataFresh ||
(this->broadcastStream && (this->now - this->lastSettingsSendTime) >= this->broadcastSettingsSendDelay)) {
vizProtobufferMessage::VizMessage::VizSettingsPb* vizSettings;
vizSettings = new vizProtobufferMessage::VizMessage::VizSettingsPb;

Expand Down Expand Up @@ -721,6 +760,7 @@ void VizInterface::WriteProtobuffer(uint64_t currentSimNanos) {
message->set_allocated_settings(vizSettings);

this->settings.dataFresh = false;
this->lastSettingsSendTime = now;
}

/*! Send the Vizard live settings */
Expand Down Expand Up @@ -1044,13 +1084,40 @@ void VizInterface::WriteProtobuffer(uint64_t currentSimNanos) {
}

{
google::protobuf::uint8 varIntBuffer[4];
// Serialize message (as is)
uint32_t byteCount = (uint32_t)message->ByteSizeLong();
google::protobuf::uint8* end =
google::protobuf::io::CodedOutputStream::WriteVarint32ToArray(byteCount, varIntBuffer);
unsigned long varIntBytes = (unsigned long)(end - varIntBuffer);
if (this->saveFile) {
this->outputStream->write(reinterpret_cast<char*>(varIntBuffer), (int)varIntBytes);
void* serialized_message = malloc(byteCount);
message->SerializeToArray(serialized_message, (int)byteCount);

// BROADCAST MODE
if (this->broadcastStream) {
// Send serialized message to BROADCAST (PUBLISHER) socket */
int sendStatus = zmq_send(this->publisher_socket, "SIM_UPDATE", 10, ZMQ_SNDMORE);
if (sendStatus == -1) {
bskLogger.bskLog(BSK_ERROR, "Broadcast header did not send to socket.");
}
sendStatus = zmq_send(this->publisher_socket, serialized_message, byteCount, 0);
if (sendStatus == -1) {
bskLogger.bskLog(BSK_ERROR, "Broadcast protobuffer did not send to socket.");
}
}

/* If settings were broadcast, remove from message before saving & sending to 2-way socket to reduce message
size. Message must be re-serialized here as its contents have changed. */
if (this->liveStream && (this->firstPass != 0) && (this->lastSettingsSendTime == this->now)) {
// Zero-out settings to reduce message size if not at first timestep
message->set_allocated_settings(nullptr);
// Re-serialize
google::protobuf::uint8 varIntBuffer[4];
byteCount = (uint32_t)message->ByteSizeLong();
google::protobuf::uint8* end =
google::protobuf::io::CodedOutputStream::WriteVarint32ToArray(byteCount, varIntBuffer);
unsigned long varIntBytes = (unsigned long)(end - varIntBuffer);
if (this->saveFile) {
this->outputStream->write(reinterpret_cast<char*>(varIntBuffer), (int)varIntBytes);
}
serialized_message = malloc(byteCount);
message->SerializeToArray(serialized_message, (int)byteCount);
}

/*! Enter in lock-step with the vizard to simulate a camera */
Expand Down Expand Up @@ -1078,10 +1145,7 @@ void VizInterface::WriteProtobuffer(uint64_t currentSimNanos) {
zmq_msg_t receive_buffer;
zmq_msg_init(&receive_buffer);
zmq_msg_recv(&receive_buffer, requester_socket, 0);

/*! - send protobuffer raw over zmq_socket */
void* serialized_message = malloc(byteCount);
message->SerializeToArray(serialized_message, (int)byteCount);
zmq_msg_close(&receive_buffer);

/*! - Normal sim step by sending protobuffers */
zmq_msg_t request_header;
Expand Down
34 changes: 23 additions & 11 deletions src/simulation/vizard/vizInterface/vizInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "vizMessage.pb.h"
#include <zmq.h>
#include <ctime>
#include <fstream>
#include <map>
#include <vector>
Expand Down Expand Up @@ -45,11 +46,13 @@ class VizInterface : public SysModel {
std::vector<LocationPbMsg*> locations; //!< [] vector of ground or spacecraft locations
std::vector<GravBodyInfo> gravBodyInformation; //!< [-] vector of gravitational body info
std::vector<Message<CameraImageMsgPayload>*>
opnavImageOutMsgs; //!< vector of vizard instrument camera output messages
int opNavMode; /*!< [int] Set non-zero positive value if Unity/Viz couple in direct
communication. (1 - regular opNav, 2 - performance opNav) */
bool saveFile; //!< [Bool] Set True if Vizard should save a file of the data.
bool liveStream; //!< [Bool] Set True if Vizard should receive a live stream of BSK data.
opnavImageOutMsgs; //!< vector of vizard instrument camera output messages
int opNavMode; /*!< [int] Set non-zero positive value if Unity/Viz couple in direct
communication. (1 - regular opNav, 2 - performance opNav) */
bool saveFile; //!< [Bool] Set True if Vizard should save a file of the data.

bool liveStream; //!< [Bool] Set True if Vizard should receive a live stream of BSK data.
bool broadcastStream; //!< [Bool] Set True if messages should be broadcast for listener Vizards to pick up.
std::vector<void*> bskImagePtrs; /*!< [RUN] vector of permanent pointers for the images to be used in BSK
without relying on ZMQ because ZMQ will free it (whenever, who knows) */

Expand All @@ -62,9 +65,13 @@ class VizInterface : public SysModel {
VizSettings settings; //!< [-] container for the Viz settings that can be specified from BSK
LiveVizSettings liveSettings; //!< [-] container for Viz settings that are updated on each time step

std::string comProtocol; //!< Communication protocol to use when connecting to Vizard
std::string comAddress; //!< Communication address to use when connecting to Vizard
std::string comPortNumber; //!< Communication port number to use when connecting to Vizard
std::string reqComProtocol; //!< Communication protocol to use when connecting to 2-way Vizard
std::string reqComAddress; //!< Communication address to use when connecting to 2-way Vizard
std::string reqPortNumber; //!< Communication port number to use when connecting to 2-way Vizard
std::string pubComProtocol; //!< Communication protocol to use when connecting to broadcast Vizard
std::string pubComAddress; //!< Communication address to use when connecting to broadcast Vizard
std::string pubPortNumber; //!< Communication port number to use when connecting to broadcast Vizard
double broadcastSettingsSendDelay; //!< Real-time delay between sending Viz settings to broadcast socket

ReadFunctor<EpochMsgPayload> epochInMsg; //!< [-] simulation epoch date/time input msg
MsgCurrStatus epochMsgStatus; //!< [-] ID of the epoch msg
Expand All @@ -74,13 +81,18 @@ class VizInterface : public SysModel {

private:
// ZeroMQ State
void* context;
void* requester_socket;
int firstPass; //!< Flag to intialize the viz at first timestep
void* requester_context; //!< 2-way context pointer (container for socket management)
void* requester_socket; //!< 2-way socket pointer (ZMQ_REQ)
void* publisher_context; //!< Broadcast context pointer (container for socket management)
void* publisher_socket; //!< Broadcast socket pointer (ZMQ_PUB)
int firstPass; //!< Flag to intialize the viz at first timestep

std::vector<MsgCurrStatus> spiceInMsgStatus; //!< [-] status of the incoming planets' spice data messages
std::vector<SpicePlanetStateMsgPayload> spiceMessage; //!< [-] Spice message copies
std::ofstream* outputStream; //!< [-] Output file stream opened in reset
int64_t now; //!< Current system time stamp
int64_t lastSettingsSendTime; //!< System time stamp when settings message was last sent to broadcast socket

void requestImage(size_t camCounter,
uint64_t currentSimNanos); //!< request image from Vizard and store it in output img msg
};
Expand Down
11 changes: 10 additions & 1 deletion src/utilities/vizSupport.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,8 @@ def enableUnityVisualization(scSim, simTaskName, scList, **kwargs):
flag if opNaveMode should be used
liveStream: bool
flag if live data streaming to Vizard should be used
broadcastStream: bool
flag if messages should be broadcast for listener Vizards to pick up.
genericStorageList:
list of lists of ``GenericStorage`` structures. The outer list length must match ``scList``.
lightList:
Expand Down Expand Up @@ -1140,7 +1142,7 @@ def enableUnityVisualization(scSim, simTaskName, scList, **kwargs):
global firstSpacecraftName

unitTestSupport.checkMethodKeyword(
['saveFile', 'opNavMode', 'rwEffectorList', 'thrEffectorList', 'thrColors', 'liveStream', 'cssList',
['saveFile', 'opNavMode', 'rwEffectorList', 'thrEffectorList', 'thrColors', 'cssList', 'liveStream', 'broadcastStream',
'genericSensorList', 'transceiverList', 'genericStorageList', 'lightList', 'spriteList',
'modelDictionaryKeyList', 'oscOrbitColorList', 'trueOrbitColorList', 'logoTextureList',
'msmInfoList', 'ellipsoidList', 'trueOrbitColorInMsgList'],
Expand Down Expand Up @@ -1531,6 +1533,13 @@ def enableUnityVisualization(scSim, simTaskName, scList, **kwargs):
exit(1)

vizMessenger.opNavMode = 0
if 'broadcastStream' in kwargs:
val = kwargs['broadcastStream']
if not isinstance(val, bool):
print('ERROR: vizSupport: broadcastStream must be True or False')
exit(1)
vizMessenger.broadcastStream = val

if 'opNavMode' in kwargs:
val = kwargs['opNavMode']
if not isinstance(val, int):
Expand Down
Loading