diff --git a/src/main/java/com/bealetech/metrics/reporting/StatsdReporter.java b/src/main/java/com/bealetech/metrics/reporting/StatsdReporter.java index 6d4cdb2..abe1dbd 100644 --- a/src/main/java/com/bealetech/metrics/reporting/StatsdReporter.java +++ b/src/main/java/com/bealetech/metrics/reporting/StatsdReporter.java @@ -33,6 +33,8 @@ public class StatsdReporter extends AbstractPollingReporter implements MetricProcessor { + private static final int MAX_UDPDATAGRAM_LENGTH = 1300; // In reality, usually closer to 1500 + public static enum StatType { COUNTER, TIMER, GAUGE } private static final Logger LOG = LoggerFactory.getLogger(StatsdReporter.class); @@ -41,8 +43,12 @@ public static enum StatType { COUNTER, TIMER, GAUGE } protected final MetricPredicate predicate; protected final Locale locale = Locale.US; protected final Clock clock; + protected final UDPSocketProvider socketProvider; + protected DatagramSocket currentSocket = null; + protected final VirtualMachineMetrics vm; + protected Writer writer; protected ByteArrayOutputStream outputData; @@ -110,24 +116,19 @@ public void setPrintVMMetrics(boolean printVMMetrics) { @Override public void run() { - DatagramSocket socket = null; + try { - socket = this.socketProvider.get(); - outputData.reset(); - prependNewline = false; - writer = new BufferedWriter(new OutputStreamWriter(this.outputData)); + currentSocket = this.socketProvider.get(); + resetWriterState(); final long epoch = clock.time() / 1000; - if (this.printVMMetrics) { + if (printVMMetrics) { printVmMetrics(epoch); } printRegularMetrics(epoch); // Send UDP data - writer.flush(); - DatagramPacket packet = this.socketProvider.newPacket(outputData); - packet.setData(outputData.toByteArray()); - socket.send(packet); + sendDatagram(); } catch (Exception e) { if (LOG.isDebugEnabled()) { LOG.debug("Error writing to Graphite", e); @@ -142,13 +143,28 @@ public void run() { } } } finally { - if (socket != null) { - socket.close(); + if (currentSocket != null) { + currentSocket.close(); } writer = null; } } + private void resetWriterState() { + outputData.reset(); + prependNewline = false; + writer = new BufferedWriter(new OutputStreamWriter(this.outputData)); + } + + private void sendDatagram() throws IOException { + writer.flush(); + if (outputData.size() > 0) { // Don't send an empty datagram + DatagramPacket packet = this.socketProvider.newPacket(outputData); + packet.setData(outputData.toByteArray()); + currentSocket.send(packet); + } + } + protected void printVmMetrics(long epoch) { // Memory sendFloat("jvm.memory.totalInit", StatType.GAUGE, vm.totalInit()); @@ -321,6 +337,12 @@ protected void sendData(String name, String value, StatType statType) { writer.write(statTypeStr); prependNewline = true; writer.flush(); + + if (outputData.size() > MAX_UDPDATAGRAM_LENGTH) { + // Need to send our UDP packet now before it gets too big. + sendDatagram(); + resetWriterState(); + } } catch (IOException e) { LOG.error("Error sending to Graphite:", e); }