diff --git a/docker/Makefile b/docker/Makefile index f9de670..2931b96 100644 --- a/docker/Makefile +++ b/docker/Makefile @@ -3,12 +3,14 @@ build: # https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-configuration.html # https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-local.html#xray-daemon-local-docker +# get traces with +# EPOCH=$(date +%s) AWS_PROFILE=dev aws xray get-trace-summaries --start-time $(($EPOCH-60)) --end-time $(($EPOCH)) run: build @docker run \ --rm \ --attach STDOUT \ -v ~/.aws/:/root/.aws/:ro \ - -e AWS_REGION=us-east-2 \ + -e AWS_REGION=us-east-1 \ -e AWS_PROFILE=$(AWS_PROFILE) \ --name xray-daemon \ -p 2000:2000/udp \ diff --git a/rusoto/src/lib.rs b/rusoto/src/lib.rs index 69eacbe..19bf78d 100644 --- a/rusoto/src/lib.rs +++ b/rusoto/src/lib.rs @@ -6,26 +6,32 @@ use rusoto_core::{ signature::SignedRequest, DispatchSignedRequest, }; -use std::{sync::Arc, time::Duration}; -use xray::{Client, Segment, Subsegment}; +use std::time::Duration; +use xray::{ + segment::{AwsOperation, Http, Response}, + OpenSubsegment, Recorder, +}; pub struct TracedRequests { dispatcher: D, - client: Arc, + recorder: Recorder, } impl TracedRequests { /// Create a new tracing dispatcher with a default X-Ray client pub fn new(dispatcher: D) -> Self { - Self::new_with_client(dispatcher, Arc::new(Client::default())) + Self::new_with_recorder(dispatcher, Recorder::default()) } /// Create a new tracing dispatcher with a custom X-Ray client - pub fn new_with_client( + pub fn new_with_recorder( dispatcher: D, - client: Arc, + recorder: Recorder, ) -> Self { - Self { dispatcher, client } + Self { + dispatcher, + recorder, + } } } @@ -49,20 +55,38 @@ where request: SignedRequest, timeout: Option, ) -> Self::Future { - let segment = Segment::begin("test"); - let mut subsegment = - Subsegment::begin(segment.trace_id.clone(), None, request.service.as_str()); - subsegment.namespace = Some("aws".into()); + let mut open = self.recorder.begin_subsegment(request.service.as_ref()); + let operation = request + .headers + .get("x-amz-target") + .and_then(|values| values.iter().next()) + .and_then(|value| { + value + .iter() + .position(|&r| r == b'.') + .and_then(|pos| String::from_utf8(value[pos..].to_vec()).ok()) + }); + + if let Some(sub) = open.subsegment() { + sub.namespace = Some("aws".into()); + let region = Some(request.region.name().into()); + sub.aws = Some(AwsOperation { + operation, + region, + ..AwsOperation::default() + }); + } + TracingRequest( self.dispatcher.dispatch(request, timeout), - subsegment, - self.client.clone(), + self.recorder.clone(), + open, ) } } /** a dispatching request that will be traced if x-ray trace is sampled */ -pub struct TracingRequest(T, Subsegment, Arc); +pub struct TracingRequest(T, Recorder, OpenSubsegment); impl Future for TracingRequest where @@ -73,7 +97,18 @@ where fn poll(&mut self) -> futures::Poll { match self.0.poll() { Ok(futures::Async::Ready(res)) => { - // todo: add tracing + if let Some(sub) = self.2.subsegment() { + sub.http = Some(Http { + response: Some(Response { + status: Some(res.status.as_u16()), + content_length: res + .headers + .get("Content-Length") + .and_then(|value| value.parse::().ok()), + }), + ..Http::default() + }); + } Ok(futures::Async::Ready(res)) } err @ Err(_) => err, diff --git a/xray/src/error.rs b/xray/src/error.rs index 23c9faf..c274e81 100644 --- a/xray/src/error.rs +++ b/xray/src/error.rs @@ -4,8 +4,10 @@ use std::io::Error as IOError; #[derive(Debug, Fail)] pub enum Error { + /// Returned for general IO errors #[fail(display = "IO Error")] IO(IOError), + /// Returned for serialization related errors #[fail(display = "Json Error")] Json(JsonError), } diff --git a/xray/src/lambda.rs b/xray/src/lambda.rs index 9e71565..2bd688b 100644 --- a/xray/src/lambda.rs +++ b/xray/src/lambda.rs @@ -2,18 +2,19 @@ use crate::Header; use std::{ env::var, fs::{create_dir_all, File}, + io::Result as IoResult, }; -pub(crate) fn init() -> std::io::Result<()> { - if taskRoot().is_some() { +pub(crate) fn init() -> IoResult<()> { + if task_root_present() { create_dir_all("/tmp/.aws-xray")?; File::create("/tmp/.aws-xray/initialized")?; } Ok(()) } -pub(crate) fn taskRoot() -> Option { - var("LAMBDA_TASK_ROOT").ok() +pub(crate) fn task_root_present() -> bool { + var("LAMBDA_TASK_ROOT").is_ok() } pub(crate) fn header() -> Option
{ diff --git a/xray/src/lib.rs b/xray/src/lib.rs index 8bfd575..05ffd8d 100644 --- a/xray/src/lib.rs +++ b/xray/src/lib.rs @@ -15,12 +15,18 @@ mod error; mod header; mod hexbytes; mod lambda; -mod segment; +mod recorder; +pub mod segment; mod segment_id; mod trace_id; pub use crate::{ - epoch::Seconds, error::Error, header::Header, segment::*, segment_id::SegmentId, + epoch::Seconds, + error::Error, + header::Header, + recorder::{OpenSegment, OpenSubsegment, Recorder}, + segment::*, + segment_id::SegmentId, trace_id::TraceId, }; @@ -44,7 +50,7 @@ impl Default for Client { .ok() .and_then(|value| value.parse::().ok()) .unwrap_or_else(|| { - log::trace!("No valid `AWS_XRAY_DAEMON_ADDRESS` env variable detected falling back on default"); + log::trace!("No valid `AWS_XRAY_DAEMON_ADDRESS` env variable detected falling back on default: 127.0.0.1:2000"); ([127, 0, 0, 1], 2000).into() }); @@ -53,7 +59,8 @@ impl Default for Client { } impl Client { - const HEADER: &'static [u8] = br#"{"format": "json", "version": 1}\n"#; + const HEADER: &'static [u8] = br#"{"format": "json", "version": 1} +"#; /// Return a new X-Ray client connected /// to the provided `addr` @@ -61,6 +68,7 @@ impl Client { let socket = Arc::new(UdpSocket::bind(&[([0, 0, 0, 0], 0).into()][..])?); socket.set_nonblocking(true)?; socket.connect(&addr)?; + log::trace!("connecting to xray daemon {}", addr); Ok(Client { socket }) } @@ -81,7 +89,12 @@ impl Client { where S: Serialize, { - self.socket.send(&Self::packet(data)?)?; + log::trace!( + "sending trace data {}", + serde_json::to_string_pretty(&data).unwrap_or_default() + ); + let out = self.socket.send(&Self::packet(data)?)?; + log::trace!("send? {:?}", out); Ok(()) } } @@ -89,6 +102,24 @@ impl Client { #[cfg(test)] mod tests { use super::*; + + #[test] + #[ignore] + fn client_can_send_data() { + env_logger::init(); + let mut segment = Segment::begin( + "test-segment", + SegmentId::default(), + None, + TraceId::default(), + ); + std::thread::sleep(std::time::Duration::from_secs(1)); + segment.end(); + if let Err(e) = Client::default().send(&segment) { + assert!(false, "failed to send data: {}", e) + } + } + #[test] fn client_prefixes_packets_with_header() { assert_eq!( @@ -96,7 +127,9 @@ mod tests { "foo": "bar" })) .unwrap(), - br#"{"format": "json", "version": 1}\n{"foo":"bar"}"#.to_vec() + br#"{"format": "json", "version": 1} +{"foo":"bar"}"# + .to_vec() ) } } diff --git a/xray/src/recorder.rs b/xray/src/recorder.rs new file mode 100644 index 0000000..3f2b9f7 --- /dev/null +++ b/xray/src/recorder.rs @@ -0,0 +1,258 @@ +use crate::{Client, Header, Segment, SegmentId, Subsegment, TraceId}; +use serde::Serialize; +use std::{marker::PhantomData, mem, sync::Arc}; +use thread_local_object::ThreadLocal; + +#[derive(Clone, Default, Debug)] +pub struct Context { + trace_id: TraceId, + parent_id: Option, + segment_id: SegmentId, +} + +struct Inner { + current: ThreadLocal, + client: Client, +} + +/// Represents the current state of a (sub)segment context +/// for the current thread +/// +pub struct Current { + recorder: Recorder, + prev: Option, + // make sure this type is !Send since it pokes at thread locals + _p: PhantomData<*const ()>, +} + +unsafe impl Sync for Current {} + +impl Drop for Current { + fn drop(&mut self) { + match self.prev.take() { + Some(prev) => { + self.recorder.0.current.set(prev); + } + None => { + self.recorder.0.current.remove(); + } + } + } +} + +/// An open trace subsegment +/// +/// When dropped, the segment will be recorded +pub struct OpenSubsegment { + current: Current, + context: Context, + state: Option, +} + +impl OpenSubsegment { + fn new( + current: Current, + context: Context, + name: N, + ) -> Self + where + N: Into, + { + let subseg = Subsegment::begin( + name, + context.segment_id.clone(), + context.parent_id.clone(), + context.trace_id.clone(), + ); + + Self { + current, + context, + state: Some(subseg), + } + } + + pub fn subsegment(&mut self) -> &mut Option { + &mut self.state + } +} + +// recipie for emiting should be +// if end of last subseg, emit parent + subseg +// if not lastsubset and parent > 100 subseg +// for each subseg ss +/// if ss.in progress or its subsegs arent help stream them +/// emit subseg and remove from parent +impl Drop for OpenSubsegment { + fn drop(&mut self) { + if let Some(mut subsegment) = mem::replace(&mut self.state, None) { + subsegment.end(); + self.current.recorder.emit(&subsegment); + } + } +} + +/// An open trace subsegment +/// +/// When dropped, the segment will be recorded +pub struct OpenSegment { + current: Current, + context: Context, + state: Option, +} + +impl OpenSegment { + fn new( + current: Current, + context: Context, + name: String, + ) -> Self { + let segment = Segment::begin( + name, + context.segment_id.clone(), + context.parent_id.clone(), + context.trace_id.clone(), + ); + + Self { + current, + context, + state: Some(segment), + } + } +} + +impl Drop for OpenSegment { + fn drop(&mut self) { + if let Some(mut segment) = mem::replace(&mut self.state, None) { + segment.end(); + self.current.recorder.emit(&segment); + } + } +} + +/// A recorder manages the state of a +/// segment and its corresponding subsegments, +/// recording them when appropriate +#[derive(Clone)] +pub struct Recorder(Arc); + +impl Default for Recorder { + fn default() -> Self { + Self(Arc::new(Inner { + current: ThreadLocal::new(), + client: Client::default(), + })) + } +} + +impl Recorder { + fn emit( + &self, + s: &S, + ) where + S: Serialize, + { + if let Err(e) = self.0.client.send(&s) { + log::debug!("error emitting data {:?}", e); + } + } + /// Intended to be used when weaving context through + /// thread contexts. When dropped, the context will be placed + /// in its previous state + pub fn set( + &self, + ctx: Context, + ) -> Current { + Current { + recorder: self.clone(), + prev: self.0.current.set(ctx), + _p: PhantomData, + } + } + + /// Return the current threads current state associated with a trace + pub fn current(&self) -> Option { + self.0.current.get_cloned() + } + + /// Begins a new trace + pub fn begin_segment( + &self, + name: N, + ) -> OpenSegment + where + N: Into, + { + let name = name.into(); + if let Some(current) = self.current() { + log::debug!( + "Beginning new segment while another segment exists in the segment context. Overwriting current segment '{}' to start new segment named '{}'.", + current.segment_id, name + ) + } + let trace_id = TraceId::new(); + let segment_id = SegmentId::new(); + let context = Context { + trace_id, + segment_id, + ..Context::default() + }; + + let current = self.set(context.clone()); + OpenSegment::new(current, context, name) + } + + /// begin a new subsegment which may be the child of another + /// lambda - (immutable parent) https://github.com/aws/aws-xray-sdk-java/blob/3e0b21c5bafec8d0577768cdfc31f4139c4fbecc/aws-xray-recorder-sdk-core/src/main/java/com/amazonaws/xray/contexts/LambdaSegmentContext.java#L36 + /// thread local - https://github.com/aws/aws-xray-sdk-java/blob/3e0b21c5bafec8d0577768cdfc31f4139c4fbecc/aws-xray-recorder-sdk-core/src/main/java/com/amazonaws/xray/contexts/ThreadLocalSegmentContext.java#L20 + pub fn begin_subsegment( + &self, + name: N, + ) -> OpenSubsegment + where + N: Into, + { + let context = match self.current() { + Some(Context { + trace_id, + segment_id, + .. + }) => Context { + trace_id, + parent_id: Some(segment_id), + segment_id: SegmentId::new(), + }, + _ => match crate::lambda::header() { + Some(Header { + trace_id, + parent_id, + .. + }) => Context { + trace_id, + parent_id, + segment_id: SegmentId::new(), + }, + _ => Context::default(), + }, + }; + let current = self.set(context.clone()); + OpenSubsegment::new(current, context, name) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::{thread, time::Duration}; + #[test] + #[ignore] + fn test_recorder() { + let recorder = Recorder::default(); + let a = recorder.begin_segment("test-segment"); + thread::sleep(Duration::from_secs(1)); + let b = recorder.begin_subsegment("subsegment-b"); + thread::sleep(Duration::from_secs(1)); + let c = recorder.begin_subsegment("subsegment-c"); + } +} diff --git a/xray/src/segment.rs b/xray/src/segment.rs index 6e13e99..c67b2af 100644 --- a/xray/src/segment.rs +++ b/xray/src/segment.rs @@ -218,7 +218,12 @@ impl Segment { /// Begins a new named segment /// /// A segment's name should match the domain name or logical name of the service that generates the segment. However, this is not enforced. Any application that has permission to PutTraceSegments can send segments with any name. - pub fn begin(name: N) -> Self + pub fn begin( + name: N, + id: SegmentId, + parent_id: Option, + trace_id: TraceId, + ) -> Self where N: Into, { @@ -228,6 +233,16 @@ impl Segment { } Segment { name: valid_name, + id, + parent_id, + trace_id, + in_progress: true, + aws: Some(Aws { + xray: Some(XRay { + sdk_version: Some(env!("CARGO_PKG_VERSION").into()), + }), + ..Aws::default() + }), ..Segment::default() } } @@ -288,9 +303,10 @@ pub struct Response { impl Subsegment { /// Create a new subsegment pub fn begin( - trace_id: TraceId, - parent_id: Option, name: N, + id: SegmentId, + parent_id: Option, + trace_id: TraceId, ) -> Self where N: Into, @@ -301,6 +317,7 @@ impl Subsegment { } Subsegment { name: valid_name, + id, trace_id: Some(trace_id), parent_id, type_: "subsegment".into(), @@ -440,11 +457,19 @@ mod tests { #[test] fn segments_begin_with_names_with_a_max_len() { - assert_eq!(Segment::begin("short").name, "short"); assert_eq!( - Segment::begin(String::from_utf8_lossy(&[b'X'; 201])) - .name - .len(), + Segment::begin("short", SegmentId::default(), None, TraceId::default()).name, + "short" + ); + assert_eq!( + Segment::begin( + String::from_utf8_lossy(&[b'X'; 201]), + SegmentId::default(), + None, + TraceId::default() + ) + .name + .len(), 200 ); } @@ -452,14 +477,15 @@ mod tests { #[test] fn subsegments_begin_with_names_with_a_max_len() { assert_eq!( - Subsegment::begin(TraceId::default(), None, "short").name, + Subsegment::begin("short", SegmentId::default(), None, TraceId::default()).name, "short" ); assert_eq!( Subsegment::begin( - TraceId::default(), + String::from_utf8_lossy(&[b'X'; 201]), + SegmentId::default(), None, - String::from_utf8_lossy(&[b'X'; 201]) + TraceId::default() ) .name .len(),