-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsynchronousImageTransmission.py
89 lines (73 loc) · 3.89 KB
/
synchronousImageTransmission.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# Make sure to have CoppeliaSim running, with following scene loaded:
#
# scenes/messaging/synchronousImageTransmissionViaRemoteApi.ttt
#
# Do not launch simulation, but run this script
#
# The client side (i.e. this script) depends on:
#
# sim.py, simConst.py, and the remote API library available
# in programming/remoteApiBindings/lib/lib
try:
import sim
except:
print ('--------------------------------------------------------------')
print ('"sim.py" could not be imported. This means very probably that')
print ('either "sim.py" or the remoteApi library could not be found.')
print ('Make sure both are in the same folder as this file,')
print ('or appropriately adjust the file "sim.py"')
print ('--------------------------------------------------------------')
print ('')
import math
import time
class Client:
def __enter__(self):
self.intSignalName='legacyRemoteApiStepCounter'
self.stepCounter=0
self.lastImageAcquisitionTime=-1
sim.simxFinish(-1) # just in case, close all opened connections
self.id=sim.simxStart('127.0.0.1',19997,True,True,5000,5) # Connect to CoppeliaSim
return self
def __exit__(self,*err):
sim.simxFinish(-1)
with Client() as client:
client.runInSynchronousMode=True
print("running")
if client.id!=-1:
print ('Connected to remote API server')
def stepSimulation():
if client.runInSynchronousMode:
currentStep=client.stepCounter
sim.simxSynchronousTrigger(client.id);
while client.stepCounter==currentStep:
retCode,s=sim.simxGetIntegerSignal(client.id,client.intSignalName,sim.simx_opmode_buffer)
if retCode==sim.simx_return_ok:
client.stepCounter=s
retCode,res,img=sim.simxGetVisionSensorImage(client.id,client.visionSensorHandle,0,sim.simx_opmode_buffer)
client.lastImageAcquisitionTime=sim.simxGetLastCmdTime(client.id)
if retCode==sim.simx_return_ok:
sim.simxSetVisionSensorImage(client.id,client.passiveVisionSensorHandle,img,0,sim.simx_opmode_oneshot)
else:
retCode,res,img=sim.simxGetVisionSensorImage(client.id,client.visionSensorHandle,0,sim.simx_opmode_buffer)
if retCode==sim.simx_return_ok:
imageSimTime=sim.simxGetLastCmdTime(client.id)
if client.lastImageAcquisitionTime!=imageSimTime:
client.lastImageAcquisitionTime=imageSimTime
sim.simxSetVisionSensorImage(client.id,client.passiveVisionSensorHandle,img,0,sim.simx_opmode_oneshot)
# Start streaming client.intSignalName integer signal, that signals when a step is finished:
sim.simxGetIntegerSignal(client.id,client.intSignalName,sim.simx_opmode_streaming)
res,client.visionSensorHandle=sim.simxGetObjectHandle(client.id,'/VisionSensor',sim.simx_opmode_blocking)
res,client.passiveVisionSensorHandle=sim.simxGetObjectHandle(client.id,'/PassiveVisionSensor',sim.simx_opmode_blocking)
# Start streaming the vision sensor image:
sim.simxGetVisionSensorImage(client.id,client.visionSensorHandle,0,sim.simx_opmode_streaming)
# enable the synchronous mode on the client:
if client.runInSynchronousMode:
sim.simxSynchronous(client.id,True)
sim.simxStartSimulation(client.id,sim.simx_opmode_oneshot)
startTime=time.time()
while time.time()-startTime < 5:
stepSimulation()
# stop data streaming
sim.simxGetIntegerSignal(client.id,client.intSignalName,sim.simx_opmode_discontinue)
sim.simxGetVisionSensorImage(client.id,client.visionSensorHandle,0,sim.simx_opmode_discontinue)
sim.simxStopSimulation(client.id,sim.simx_opmode_blocking)