Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docker/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ build:

# https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-configuration.html
# https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-local.html#xray-daemon-local-docker
# get traces with
# EPOCH=$(date +%s) AWS_PROFILE=dev aws xray get-trace-summaries --start-time $(($EPOCH-60)) --end-time $(($EPOCH))
run: build
@docker run \
--rm \
--attach STDOUT \
-v ~/.aws/:/root/.aws/:ro \
-e AWS_REGION=us-east-2 \
-e AWS_REGION=us-east-1 \
-e AWS_PROFILE=$(AWS_PROFILE) \
--name xray-daemon \
-p 2000:2000/udp \
Expand Down
65 changes: 50 additions & 15 deletions rusoto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,32 @@ use rusoto_core::{
signature::SignedRequest,
DispatchSignedRequest,
};
use std::{sync::Arc, time::Duration};
use xray::{Client, Segment, Subsegment};
use std::time::Duration;
use xray::{
segment::{AwsOperation, Http, Response},
OpenSubsegment, Recorder,
};

pub struct TracedRequests<D> {
dispatcher: D,
client: Arc<Client>,
recorder: Recorder,
}

impl<D> TracedRequests<D> {
/// Create a new tracing dispatcher with a default X-Ray client
pub fn new(dispatcher: D) -> Self {
Self::new_with_client(dispatcher, Arc::new(Client::default()))
Self::new_with_recorder(dispatcher, Recorder::default())
}

/// Create a new tracing dispatcher with a custom X-Ray client
pub fn new_with_client(
pub fn new_with_recorder(
dispatcher: D,
client: Arc<Client>,
recorder: Recorder,
) -> Self {
Self { dispatcher, client }
Self {
dispatcher,
recorder,
}
}
}

Expand All @@ -49,20 +55,38 @@ where
request: SignedRequest,
timeout: Option<Duration>,
) -> Self::Future {
let segment = Segment::begin("test");
let mut subsegment =
Subsegment::begin(segment.trace_id.clone(), None, request.service.as_str());
subsegment.namespace = Some("aws".into());
let mut open = self.recorder.begin_subsegment(request.service.as_ref());
let operation = request
.headers
.get("x-amz-target")
.and_then(|values| values.iter().next())
.and_then(|value| {
value
.iter()
.position(|&r| r == b'.')
.and_then(|pos| String::from_utf8(value[pos..].to_vec()).ok())
});

if let Some(sub) = open.subsegment() {
sub.namespace = Some("aws".into());
let region = Some(request.region.name().into());
sub.aws = Some(AwsOperation {
operation,
region,
..AwsOperation::default()
});
}

TracingRequest(
self.dispatcher.dispatch(request, timeout),
subsegment,
self.client.clone(),
self.recorder.clone(),
open,
)
}
}

/** a dispatching request that will be traced if x-ray trace is sampled */
pub struct TracingRequest<T>(T, Subsegment, Arc<Client>);
pub struct TracingRequest<T>(T, Recorder, OpenSubsegment);

impl<T> Future for TracingRequest<T>
where
Expand All @@ -73,7 +97,18 @@ where
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
match self.0.poll() {
Ok(futures::Async::Ready(res)) => {
// todo: add tracing
if let Some(sub) = self.2.subsegment() {
sub.http = Some(Http {
response: Some(Response {
status: Some(res.status.as_u16()),
content_length: res
.headers
.get("Content-Length")
.and_then(|value| value.parse::<u64>().ok()),
}),
..Http::default()
});
}
Ok(futures::Async::Ready(res))
}
err @ Err(_) => err,
Expand Down
2 changes: 2 additions & 0 deletions xray/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use std::io::Error as IOError;

#[derive(Debug, Fail)]
pub enum Error {
/// Returned for general IO errors
#[fail(display = "IO Error")]
IO(IOError),
/// Returned for serialization related errors
#[fail(display = "Json Error")]
Json(JsonError),
}
Expand Down
9 changes: 5 additions & 4 deletions xray/src/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ use crate::Header;
use std::{
env::var,
fs::{create_dir_all, File},
io::Result as IoResult,
};

pub(crate) fn init() -> std::io::Result<()> {
if taskRoot().is_some() {
pub(crate) fn init() -> IoResult<()> {
if task_root_present() {
create_dir_all("/tmp/.aws-xray")?;
File::create("/tmp/.aws-xray/initialized")?;
}
Ok(())
}

pub(crate) fn taskRoot() -> Option<String> {
var("LAMBDA_TASK_ROOT").ok()
pub(crate) fn task_root_present() -> bool {
var("LAMBDA_TASK_ROOT").is_ok()
}

pub(crate) fn header() -> Option<Header> {
Expand Down
45 changes: 39 additions & 6 deletions xray/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@ mod error;
mod header;
mod hexbytes;
mod lambda;
mod segment;
mod recorder;
pub mod segment;
mod segment_id;
mod trace_id;

pub use crate::{
epoch::Seconds, error::Error, header::Header, segment::*, segment_id::SegmentId,
epoch::Seconds,
error::Error,
header::Header,
recorder::{OpenSegment, OpenSubsegment, Recorder},
segment::*,
segment_id::SegmentId,
trace_id::TraceId,
};

Expand All @@ -44,7 +50,7 @@ impl Default for Client {
.ok()
.and_then(|value| value.parse::<SocketAddr>().ok())
.unwrap_or_else(|| {
log::trace!("No valid `AWS_XRAY_DAEMON_ADDRESS` env variable detected falling back on default");
log::trace!("No valid `AWS_XRAY_DAEMON_ADDRESS` env variable detected falling back on default: 127.0.0.1:2000");
([127, 0, 0, 1], 2000).into()
});

Expand All @@ -53,14 +59,16 @@ impl Default for Client {
}

impl Client {
const HEADER: &'static [u8] = br#"{"format": "json", "version": 1}\n"#;
const HEADER: &'static [u8] = br#"{"format": "json", "version": 1}
"#;

/// Return a new X-Ray client connected
/// to the provided `addr`
pub fn new(addr: SocketAddr) -> Result<Self> {
let socket = Arc::new(UdpSocket::bind(&[([0, 0, 0, 0], 0).into()][..])?);
socket.set_nonblocking(true)?;
socket.connect(&addr)?;
log::trace!("connecting to xray daemon {}", addr);
Ok(Client { socket })
}

Expand All @@ -81,22 +89,47 @@ impl Client {
where
S: Serialize,
{
self.socket.send(&Self::packet(data)?)?;
log::trace!(
"sending trace data {}",
serde_json::to_string_pretty(&data).unwrap_or_default()
);
let out = self.socket.send(&Self::packet(data)?)?;
log::trace!("send? {:?}", out);
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
#[ignore]
fn client_can_send_data() {
env_logger::init();
let mut segment = Segment::begin(
"test-segment",
SegmentId::default(),
None,
TraceId::default(),
);
std::thread::sleep(std::time::Duration::from_secs(1));
segment.end();
if let Err(e) = Client::default().send(&segment) {
assert!(false, "failed to send data: {}", e)
}
}

#[test]
fn client_prefixes_packets_with_header() {
assert_eq!(
Client::packet(serde_json::json!({
"foo": "bar"
}))
.unwrap(),
br#"{"format": "json", "version": 1}\n{"foo":"bar"}"#.to_vec()
br#"{"format": "json", "version": 1}
{"foo":"bar"}"#
.to_vec()
)
}
}
Loading