Skip to content
This repository was archived by the owner on Oct 18, 2021. It is now read-only.

Commit 01af467

Browse files
committed
implement handshake spec
1 parent 14867b2 commit 01af467

File tree

10 files changed

+155
-31
lines changed

10 files changed

+155
-31
lines changed

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ version = "0.6.3"
3737

3838
[dev-dependencies]
3939
approx = "0.1.1"
40+
serde = "1.0.69"
41+
serde_derive = "1.0.69"
4042

4143
[dev-dependencies.serde_json]
4244
features = ["preserve_order"]

src/cursor.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ impl Cursor {
225225
) -> Result<Cursor> {
226226

227227
// Select a server stream from the topology.
228-
let (stream, slave_ok, send_read_pref) = if cmd_type.is_write_command() {
228+
let (mut stream, slave_ok, send_read_pref) = if cmd_type.is_write_command() {
229229
(try!(client.acquire_write_stream()), false, false)
230230
} else {
231231
try!(client.acquire_stream(read_pref.to_owned()))
@@ -254,7 +254,7 @@ impl Cursor {
254254
};
255255

256256
Cursor::query_with_stream(
257-
stream,
257+
&mut stream,
258258
client,
259259
namespace,
260260
new_flags,
@@ -267,7 +267,7 @@ impl Cursor {
267267
}
268268

269269
pub fn query_with_stream(
270-
stream: PooledStream,
270+
stream: &mut PooledStream,
271271
client: Client,
272272
namespace: String,
273273
flags: OpQueryFlags,
@@ -278,7 +278,6 @@ impl Cursor {
278278
read_pref: Option<ReadPreference>,
279279
) -> Result<Cursor> {
280280

281-
let mut stream = stream;
282281
let socket = stream.get_socket();
283282
let req_id = client.get_req_id();
284283

src/lib.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ use topology::{Topology, TopologyDescription, TopologyType, DEFAULT_HEARTBEAT_FR
181181
DEFAULT_LOCAL_THRESHOLD_MS, DEFAULT_SERVER_SELECTION_TIMEOUT_MS};
182182
use topology::server::Server;
183183

184+
pub const DRIVER_NAME: &'static str = "mongo-rust-driver-prototype";
185+
184186
/// Interfaces with a MongoDB server or replica set.
185187
pub struct ClientInner {
186188
/// Indicates how a server should be selected for read operations.
@@ -415,11 +417,11 @@ impl ThreadedClient for Client {
415417
&self,
416418
read_preference: ReadPreference,
417419
) -> Result<(PooledStream, bool, bool)> {
418-
self.topology.acquire_stream(read_preference)
420+
self.topology.acquire_stream(self.clone(), read_preference)
419421
}
420422

421423
fn acquire_write_stream(&self) -> Result<PooledStream> {
422-
self.topology.acquire_write_stream()
424+
self.topology.acquire_write_stream(self.clone())
423425
}
424426

425427
fn get_req_id(&self) -> i32 {

src/pool.rs

+61-6
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@
22
use error::Error::{self, ArgumentError, OperationError};
33
use error::Result;
44

5+
use Client;
6+
use coll::options::FindOptions;
7+
use command_type::CommandType;
58
use connstring::Host;
9+
use cursor::Cursor;
610
use stream::{Stream, StreamConnector};
11+
use wire_protocol::flags::OpQueryFlags;
712

813
use bufstream::BufStream;
914
use std::sync::{Arc, Condvar, Mutex};
@@ -48,6 +53,8 @@ pub struct PooledStream {
4853
wait_lock: Arc<Condvar>,
4954
// The pool iteration at the moment of extraction.
5055
iteration: usize,
56+
// Whether the handshake occurred successfully.
57+
successful_handshake: bool,
5158
}
5259

5360
impl PooledStream {
@@ -59,6 +66,11 @@ impl PooledStream {
5966

6067
impl Drop for PooledStream {
6168
fn drop(&mut self) {
69+
// Don't add streams that couldn't successfully handshake to the pool.
70+
if !self.successful_handshake {
71+
return;
72+
}
73+
6274
// Attempt to lock and return the socket to the pool,
6375
// or give up if the pool lock has been poisoned.
6476
if let Ok(mut locked) = self.pool.lock() {
@@ -117,7 +129,7 @@ impl ConnectionPool {
117129
/// Attempts to acquire a connected socket. If none are available and
118130
/// the pool has not reached its maximum size, a new socket will connect.
119131
/// Otherwise, the function will block until a socket is returned to the pool.
120-
pub fn acquire_stream(&self) -> Result<PooledStream> {
132+
pub fn acquire_stream(&self, client: Client) -> Result<PooledStream> {
121133
let mut locked = try!(self.inner.lock());
122134
if locked.size == 0 {
123135
return Err(OperationError(String::from(
@@ -133,28 +145,32 @@ impl ConnectionPool {
133145
pool: self.inner.clone(),
134146
wait_lock: self.wait_lock.clone(),
135147
iteration: locked.iteration,
148+
successful_handshake: true,
136149
});
137150
}
138151

139152
// Attempt to make a new connection
140153
let len = locked.len.load(Ordering::SeqCst);
141154
if len < locked.size {
142155
let socket = try!(self.connect());
143-
let _ = locked.len.fetch_add(1, Ordering::SeqCst);
144-
return Ok(PooledStream {
156+
let mut stream = PooledStream {
145157
socket: Some(socket),
146158
pool: self.inner.clone(),
147159
wait_lock: self.wait_lock.clone(),
148160
iteration: locked.iteration,
149-
});
161+
successful_handshake: false,
162+
};
163+
164+
self.handshake(client, &mut stream)?;
165+
let _ = locked.len.fetch_add(1, Ordering::SeqCst);
166+
return Ok(stream);
150167
}
151168

152169
// Release lock and wait for pool to be repopulated
153170
locked = try!(self.wait_lock.wait(locked));
154171
}
155172
}
156173

157-
158174
// Connects to a MongoDB server as defined by the initial configuration.
159175
fn connect(&self) -> Result<BufStream<Stream>> {
160176
match self.stream_connector.connect(
@@ -165,4 +181,43 @@ impl ConnectionPool {
165181
Err(e) => Err(Error::from(e)),
166182
}
167183
}
168-
}
184+
185+
// This sends the client metadata to the server as described by the handshake spec.
186+
//
187+
// See https://github.com/mongodb/specifications/blob/master/source/mongodb-handshake/handshake.rst
188+
fn handshake(&self, client: Client, stream: &mut PooledStream) -> Result<()> {
189+
let mut options = FindOptions::new();
190+
options.limit = Some(1);
191+
options.batch_size = Some(1);
192+
193+
let flags = OpQueryFlags::with_find_options(&options);
194+
195+
Cursor::query_with_stream(
196+
stream,
197+
client,
198+
String::from("local.$cmd"),
199+
flags,
200+
doc! {
201+
"isMaster": 1i32,
202+
"client": {
203+
"driver": {
204+
"name": ::DRIVER_NAME,
205+
"version": env!("CARGO_PKG_VERSION"),
206+
},
207+
"os": {
208+
"type": ::std::env::consts::OS,
209+
"architecture": ::std::env::consts::ARCH
210+
}
211+
},
212+
},
213+
options,
214+
CommandType::IsMaster,
215+
false,
216+
None,
217+
)?;
218+
219+
stream.successful_handshake = true;
220+
221+
Ok(())
222+
}
223+
}

src/topology/mod.rs

+18-14
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ impl TopologyDescription {
116116
}
117117

118118
/// Returns the nearest server stream, calculated by round trip time.
119-
fn get_nearest_from_vec(&self, servers: &mut Vec<Host>) -> Result<(PooledStream, ServerType)> {
119+
fn get_nearest_from_vec(&self, client: Client, servers: &mut Vec<Host>) -> Result<(PooledStream, ServerType)> {
120120
servers.sort_by(|a, b| {
121121
let mut a_rtt = i64::MAX;
122122
let mut b_rtt = i64::MAX;
@@ -140,7 +140,7 @@ impl TopologyDescription {
140140
if let Ok(description) = server.description.read() {
141141
if description.round_trip_time.is_none() {
142142
break;
143-
} else if let Ok(stream) = server.acquire_stream() {
143+
} else if let Ok(stream) = server.acquire_stream(client.clone()) {
144144
return Ok((stream, description.server_type));
145145
}
146146
}
@@ -152,13 +152,13 @@ impl TopologyDescription {
152152
}
153153

154154
/// Returns a random server stream from the vector.
155-
fn get_rand_from_vec(&self, servers: &mut Vec<Host>) -> Result<(PooledStream, ServerType)> {
155+
fn get_rand_from_vec(&self, client: Client, servers: &mut Vec<Host>) -> Result<(PooledStream, ServerType)> {
156156
while !servers.is_empty() {
157157
let len = servers.len();
158158
let index = thread_rng().gen_range(0, len);
159159

160160
if let Some(server) = self.servers.get(&servers[index]) {
161-
if let Ok(stream) = server.acquire_stream() {
161+
if let Ok(stream) = server.acquire_stream(client.clone()) {
162162
if let Ok(description) = server.description.read() {
163163
return Ok((stream, description.server_type));
164164
}
@@ -174,6 +174,7 @@ impl TopologyDescription {
174174
/// Returns a server stream for read operations.
175175
pub fn acquire_stream(
176176
&self,
177+
client: Client,
177178
read_preference: &ReadPreference,
178179
) -> Result<(PooledStream, bool, bool)> {
179180
let (mut hosts, rand) = self.choose_hosts(read_preference)?;
@@ -190,7 +191,7 @@ impl TopologyDescription {
190191
if hosts.is_empty() && read_preference.mode == ReadMode::SecondaryPreferred {
191192
let mut read_pref = read_preference.clone();
192193
read_pref.mode = ReadMode::PrimaryPreferred;
193-
return self.acquire_stream(&read_pref);
194+
return self.acquire_stream(client, &read_pref);
194195
}
195196

196197
// If no servers are available, request an update from all monitors.
@@ -205,9 +206,9 @@ impl TopologyDescription {
205206

206207
// Retrieve a server stream from the list of acceptable hosts.
207208
let (pooled_stream, server_type) = if rand {
208-
try!(self.get_rand_from_vec(&mut hosts))
209+
try!(self.get_rand_from_vec(client, &mut hosts))
209210
} else {
210-
try!(self.get_nearest_from_vec(&mut hosts))
211+
try!(self.get_nearest_from_vec(client, &mut hosts))
211212
};
212213

213214
// Determine how to handle server-side logic based on ReadMode and TopologyType.
@@ -251,7 +252,7 @@ impl TopologyDescription {
251252
}
252253

253254
/// Returns a server stream for write operations.
254-
pub fn acquire_write_stream(&self) -> Result<PooledStream> {
255+
pub fn acquire_write_stream(&self, client: Client) -> Result<PooledStream> {
255256
let (mut hosts, rand) = self.choose_write_hosts();
256257

257258
// If no servers are available, request an update from all monitors.
@@ -262,9 +263,9 @@ impl TopologyDescription {
262263
}
263264

264265
if rand {
265-
Ok(try!(self.get_rand_from_vec(&mut hosts)).0)
266+
Ok(try!(self.get_rand_from_vec(client, &mut hosts)).0)
266267
} else {
267-
Ok(try!(self.get_nearest_from_vec(&mut hosts)).0)
268+
Ok(try!(self.get_nearest_from_vec(client, &mut hosts)).0)
268269
}
269270
}
270271

@@ -886,6 +887,7 @@ impl Topology {
886887
// Private server stream acquisition helper.
887888
fn acquire_stream_private(
888889
&self,
890+
client: Client,
889891
read_preference: Option<ReadPreference>,
890892
write: bool,
891893
) -> Result<(PooledStream, bool, bool)> {
@@ -895,12 +897,13 @@ impl Topology {
895897

896898
loop {
897899
let result = if write {
898-
match self.description.read()?.acquire_write_stream() {
900+
match self.description.read()?.acquire_write_stream(client.clone()) {
899901
Ok(stream) => Ok((stream, false, false)),
900902
Err(err) => Err(err),
901903
}
902904
} else {
903905
self.description.read()?.acquire_stream(
906+
client.clone(),
904907
read_preference.as_ref().unwrap(),
905908
)
906909
};
@@ -926,14 +929,15 @@ impl Topology {
926929
/// Returns a server stream for read operations.
927930
pub fn acquire_stream(
928931
&self,
932+
client: Client,
929933
read_preference: ReadPreference,
930934
) -> Result<(PooledStream, bool, bool)> {
931-
self.acquire_stream_private(Some(read_preference), false)
935+
self.acquire_stream_private(client, Some(read_preference), false)
932936
}
933937

934938
/// Returns a server stream for write operations.
935-
pub fn acquire_write_stream(&self) -> Result<PooledStream> {
936-
let (stream, _, _) = try!(self.acquire_stream_private(None, true));
939+
pub fn acquire_write_stream(&self, client: Client) -> Result<PooledStream> {
940+
let (stream, _, _) = try!(self.acquire_stream_private(client, None, true));
937941
Ok(stream)
938942
}
939943
}

src/topology/monitor.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -258,17 +258,16 @@ impl Monitor {
258258
options.limit = Some(1);
259259
options.batch_size = Some(1);
260260

261-
262261
let flags = OpQueryFlags::with_find_options(&options);
263262
let mut filter = bson::Document::new();
264263
filter.insert("isMaster", Bson::I32(1));
265264

266-
let stream = try!(self.personal_pool.acquire_stream());
265+
let mut stream = try!(self.personal_pool.acquire_stream(self.client.clone()));
267266

268267
let time_start = time::get_time();
269268

270269
let cursor = try!(Cursor::query_with_stream(
271-
stream,
270+
&mut stream,
272271
self.client.clone(),
273272
String::from("local.$cmd"),
274273
flags,

src/topology/server.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,8 @@ impl Server {
247247
}
248248

249249
/// Returns a server stream from the connection pool.
250-
pub fn acquire_stream(&self) -> Result<PooledStream> {
251-
self.pool.acquire_stream()
250+
pub fn acquire_stream(&self, client: Client) -> Result<PooledStream> {
251+
self.pool.acquire_stream(client)
252252
}
253253

254254
/// Request an update from the monitor on the server status.

tests/client/handshake.rs

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use bson::{self, Bson};
2+
use mongodb::{DRIVER_NAME, Client, ThreadedClient};
3+
use mongodb::db::ThreadedDatabase;
4+
use mongodb::CommandType;
5+
6+
#[derive(Debug, Deserialize)]
7+
struct Metadata {
8+
#[serde(rename = "clientMetadata")]
9+
pub client: ClientMetadata,
10+
}
11+
12+
#[derive(Debug, Deserialize)]
13+
struct ClientMetadata {
14+
pub driver: DriverMetadata,
15+
pub os: OsMetadata,
16+
}
17+
18+
#[derive(Debug, Deserialize)]
19+
struct DriverMetadata {
20+
pub name: String,
21+
pub version: String,
22+
}
23+
24+
#[derive(Debug, Deserialize)]
25+
struct OsMetadata {
26+
#[serde(rename = "type")]
27+
pub os_type: String,
28+
pub architecture: String,
29+
}
30+
31+
#[test]
32+
fn metadata_sent_in_handshake() {
33+
let client = Client::connect("localhost", 27017).unwrap();
34+
let db = client.db("admin");
35+
skip_if_db_version_below!(db, 3, 4);
36+
37+
let result = db.command(doc! { "currentOp" => 1 }, CommandType::Suppressed, None).unwrap();
38+
let in_prog = match result.get("inprog") {
39+
Some(Bson::Array(in_prog)) => in_prog,
40+
_ => panic!("no `inprog` array found in response to `currentOp`"),
41+
};
42+
43+
let metadata: Metadata = bson::from_bson(in_prog[0].clone()).unwrap();
44+
assert_eq!(metadata.client.driver.name, DRIVER_NAME);
45+
}
46+

0 commit comments

Comments
 (0)