Skip to content

Conversation

@belimawr
Copy link
Contributor

@belimawr belimawr commented Sep 12, 2025

Proposed commit message

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

## Disruptive User Impact

Author's Checklist

How to test this PR locally

Manual test

  1. Create a log file with at least 1kb
    docker run -it --rm mingrammer/flog -n 20 > /tmp/flog.log
    
  2. Start Filebeat with the following configuration
    filebeat.inputs:
      - type: log
        id: log-as-filestream
        allow_deprecated_use: true
        paths:
          - /tmp/flog.log
    
    output.file:
      path: "${path.home}"
      filename: output
      rotate_on_startup: false  
    
    queue.mem:
      flush.timeout: 0
    
    logging:
      to_stderr: true
      level: debug
      selectors:
        - "*"
    
    #features:
    #  log_input_run_as_filestream:
    #    enabled: true

  1. Look at the logs, you will see some logs from the Log input
    {
      "log.level": "debug",
      "@timestamp": "2025-09-12T12:28:06.899-0400",
      "log.logger": "input.harvester",
      "log.origin": {
        "function": "github.com/elastic/beats/v7/filebeat/input/log.(*Log).Read",
        "file.name": "log/log.go",
        "file.line": 111
      },
      "message": "End of file reached: /tmp/flog.log; Backoff now.",
      "service.name": "filebeat",
      "input_id": "94a20b13-6927-4ff4-8f99-4f750469ed96",
      "source_file": "/tmp/flog.log",
      "state_id": "native::26052-40",
      "finished": false,
      "os_id": "26052-40",
      "harvester_id": "69128be5-d1f4-4493-935a-889d0461c95d",
      "ecs.version": "1.6.0"
    }
  2. Stop Filebeat
  3. Check the number of events published
    % wc -l output-*.ndjson        
    20 output-20250912.ndjson
    
  4. Uncomment the features: section in the configuration

  1. Start Filebeat again, you'll see some logs from the Filestream input
    {
      "log.level": "debug",
      "@timestamp": "2025-09-12T12:31:07.586-0400",
      "log.logger": "input.filestream",
      "log.origin": {
        "function": "github.com/elastic/beats/v7/filebeat/input/filestream.(*logFile).Read",
        "file.name": "filestream/filestream.go",
        "file.line": 139
      },
      "message": "End of file reached: /tmp/flog.log; Backoff now.",
      "service.name": "filebeat",
      "id": "log-as-filestream",
      "source_file": "filestream::log-as-filestream::fingerprint::445d01af94a604742ab7bb9db8b5bceff4b780925c2f8c7729165076319fc016",
      "path": "/tmp/flog.log",
      "state-id": "fingerprint::445d01af94a604742ab7bb9db8b5bceff4b780925c2f8c7729165076319fc016",
      "ecs.version": "1.6.0"
    }
  2. Check the number of events published, it should still be 20
    % wc -l output-*.ndjson        
    20 output-20250912.ndjson
    

Elastic Agent

  1. Create a log file with some lines
    docker run -it --rm mingrammer/flog -n 20 > /tmp/flog.log

  2. Run a standalone Elastic Agent with the following configuration (adjust the output settings as necessary)

    elastic-agent.yml

    outputs:
      default:
        type: elasticsearch
        hosts:
          - https://localhost:9200
        username: "elastic"
        password: "changeme"
        preset: latency
        ssl.verification_mode: none
    
    inputs:
      - type: log
        id: your-input-id
        streams:
          - id: your-log-stream-id
            data_stream:
              dataset: generic
            # run_as_filestream: true
            paths:
              - /tmp/flog.log
    
    agent.monitoring:
      enabled: false
      logs: false
      metrics: false
    
    agent.logging:
      level: debug
      to_stderr: true

  3. Ensure all events have been ingested

  4. Look at the logs, you will see Log input logs as described in the manual test

  5. Stop the Elastic Agent

  6. Uncomment run_as_filestream: true from the configuration

  7. Start the Elastic Agent again

  8. Ensure no more data is added to the output, no data duplication.

  9. Look at the logs, you will see Filestream input logs as described in the manual test

  10. You can also collect the diagnostics and look at the registry

    1. Collect the diagnostics and extract it
    2. Go to components/log-default
    3. Extract the registry: tar -xf registry.tar.gz
    4. cat registry/filebeat/log.json|jq -Sc
    5. You will see the entries from the Filestream input starting with the same offset as the ones from the Log input
      {"id":3,"op":"set"}
      {"k":"filebeat::logs::native::16-50","v":{"FileStateOS":{"device":50,"inode":16},"id":"native::16-50","identifier_name":"native","offset":2113,"prev_id":"","source":"/tmp/flog.log","timestamp":[280186759520503,1762292780],"ttl":
      -1,"type":"log"}}                                                                                                
      {"id":4,"op":"set"}
      {"k":"filestream::your-log-stream-id::native::16-50","v":{"cursor":{"offset":2113},"meta":{"identifier_name":"native","source":"/tmp/flog.log"},"ttl":-1,"updated":[281470681743360,18446744011573954816]}}                    
      {"id":5,"op":"remove"}
      {"k":"filebeat::logs::native::16-50"}
      {"id":6,"op":"set"}
      {"k":"filestream::your-log-stream-id::native::16-50","v":{"cursor":{"offset":2113},"meta":{"identifier_name":"native","source":"/tmp/flog.log"},"ttl":-1,"updated":[281470681743360,18446744011573954816]}}

Run the tests

cd filebeat
mage clean
go test -v -count=1 -run=TestRunAsFilestream ./input/logv2
mage BuildSystemTestBinary 
go test -v -count=1 -tags=integration -run=TestLogAsFilestream ./tests/integration 

cd ../x-pack/filebeat
mage clean
mage BuildSystemTestBinary
go test -v -count=1 -tags=integration -run=TestLogAsFilestream ./tests/integration

Related issues

## Use cases
## Screenshots
## Logs

@belimawr belimawr self-assigned this Sep 12, 2025
@belimawr belimawr added the Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team label Sep 12, 2025
@botelastic botelastic bot added needs_team Indicates that the issue/PR needs a Team:* label and removed needs_team Indicates that the issue/PR needs a Team:* label labels Sep 12, 2025
@github-actions
Copy link
Contributor

🤖 GitHub comments

Expand to view the GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

@mergify
Copy link
Contributor

mergify bot commented Sep 12, 2025

This pull request does not have a backport label.
If this is a bug or security fix, could you label this PR @belimawr? 🙏.
For such, you'll need to label your PR with:

  • The upcoming major version of the Elastic Stack
  • The upcoming minor version of the Elastic Stack (if you're not pushing a breaking change)

To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

@belimawr belimawr changed the title [WIP] PoC to run Filestream as log input [PoC] Filestream running as log input Sep 12, 2025
@belimawr belimawr marked this pull request as ready for review November 6, 2025 16:34
@belimawr belimawr requested review from a team as code owners November 6, 2025 16:34
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@cmacknz
Copy link
Member

cmacknz commented Nov 6, 2025

Have you tested this with the inputs when running as beats receivers? It is critically important that this change has no impact on that conversion right now.

  1. Enabling this feature flag, and then running the log input as a beat receiver later just works.
  2. Running the log input as a beat receiver first, and then enabling the flag just works
  3. Running the log input as a beat receiver first, and then enabled and disabling the flag just works

Think of any cases where these changes could interact and make sure there is no impact.

@belimawr
Copy link
Contributor Author

belimawr commented Nov 7, 2025

Have you tested this with the inputs when running as beats receivers? It is critically important that this change has no impact on that conversion right now.

  1. Enabling this feature flag, and then running the log input as a beat receiver later just works.
  2. Running the log input as a beat receiver first, and then enabling the flag just works
  3. Running the log input as a beat receiver first, and then enabled and disabling the flag just works

Think of any cases where these changes could interact and make sure there is no impact.

It should have no effect because the changes are at the input instantiation. Anyways, I'll create tests for those scenarios as well.

Copy link
Member

@AndersonQ AndersonQ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few questions

Comment on lines 30 to 45
**`log.file.device_id`**
: The device ID used for the log file, this is used by the 'native' file identity.

type: keyword

required: False


**`log.file.inode`**
: The inode of the log file, this is used by the 'native' file identity.

type: long

required: False


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, is this related to this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some Python tests started failing because those fields were not in fields.yml, so I added them there, which auto-generates this documentation.

I'm debating whether I should just modify the test to ignore those fields 🤔

Comment on lines +65 to +69
// Only allow to run the Log input as Filestream if Filebeat
// is running under Elastic Agent.
if !management.UnderAgent() {
return false, nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be before the features.LogInputRunFilestream() or together, features.LogInputRunFilestream() && management.UnderAgent()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, it can run as filestream even if it isn't under agent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, it can run as filestream even if it isn't under agent.

That's the whole point: the feature flag overwrites everything. This is required to have all the Log input tests to run with Filestream. It also gives us flexibility to test or even use it without Elastic Agent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, you you mind explaining it a bit better? Perhaps add on the feature flag if it's set, it runs. Then, if it isn't set, it needs to be running under agent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, it isn't clear if it was intentional or a mistake

Comment on lines 128 to 132
// PluginV2 returns a v2.Plugin with a manager that checks whether
// the config is from a Log input that should run as Filestream.
// If that is the case the Log input configuration is converted to
// Filestream and the Filestream input returned.
// Otherwise v2.ErrUnknownInput is returned.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems outdated, no error is returned

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's referring to the manager's behaviour. I'll clarify/re-write it.

Comment on lines 164 to 165
// When inputs are created, inputs V2 are tried first, so if we
// are supposed to run as the Log input, return v2.ErrUnknownInput
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused. Above you say the input v1 might return v2.ErrUnknownInput so filestream takes over. here you say v2 returns v2.ErrUnknownInput so it runs as the log input.

Is the doc outdated or did I miss something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see it can be confusing, I'll clarify the comments by not mentioning the expected behaviour of other parts of Filebeat.

Inputs V2 are tried first on a standalone Filebeat, but I believe I've seen the opposite happening. Regardless of that both, V1 and V2 inputs will always return v2.ErrUnknownInput if they should not start an input.

Comment on lines +188 to +190
# if the test file contains '.journal', later it will try to remove
# the '--once' flag and the journald input will be used,
# so there is nothing to do here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, what do you mean? I didn't understand what you're trying to clarify here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole test is very complex :/

It can work with log files and journal files, for journal files there is some special code. The variable test_file contains the full name of the file the test is going to ingest, if it contains .journal in the name, we know it is trying to ingest a journal file and therefore the journald input will be used.

When adding the changes for the Log input running as Filestream, some of the special handling conflicted, so I added ".journal" not in test_file to this condition to avoid having problems with the cmd.remove("--once") that is done here and down below on L207-L208.

@belimawr
Copy link
Contributor Author

elastic/elastic-package#3012 should help testing this PR with Integrations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-skip Skip notification from the automated backport with mergify skip-changelog Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants