diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index 600b689d0f1..5f8e35b7670 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -550,6 +550,7 @@ def clear_pipeline_metrics collector.clear("stats/pipelines/#{pipeline_id}/plugins") collector.clear("stats/pipelines/#{pipeline_id}/events") collector.clear("stats/pipelines/#{pipeline_id}/flow") + collector.clear("stats/pipelines/#{pipeline_id}/batch") end end diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index fa2ededdcad..0594e13407c 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -1665,18 +1665,30 @@ def build_pipeline_string_config(dummyinput_config) end context "of a running pipeline" do - let(:pipeline_settings) { { "pipeline.batch.size" => 1, "pipeline.workers" => 1, "pipeline.id" => pipeline_id, "metric.collect" => true } } + let(:pipeline_settings) { { + "pipeline.batch.size" => 1, + "pipeline.workers" => 1, + "pipeline.id" => pipeline_id, + "metric.collect" => true, + "pipeline.batch.metrics.sampling_mode" => "full" + } } it "should clear the pipeline metrics" do dummyinput.keep_running.make_true expect { pipeline.start }.to_not raise_error + collected_metric = subject.metric.collector.snapshot_metric.metric_store.get_with_path("stats/pipelines") + expect(collected_metric[:stats][:pipelines][pipeline_id.to_sym]).to include(:batch) pipeline.shutdown expect(pipeline).to have_received(:clear_pipeline_metrics) expect(pipeline).to have_received(:stop_inputs) expect(pipeline).to have_received(:wait_for_shutdown) + + # Take a snapshot from metrics and verify the batch metrics have been cleared + collected_metric = subject.metric.collector.snapshot_metric.metric_store.get_with_path("stats/pipelines") + expect(collected_metric[:stats][:pipelines][pipeline_id.to_sym]).to_not include(:batch) end end