-
Notifications
You must be signed in to change notification settings - Fork 527
[connectors-sdk] Add ObservedData to models (#5397) #5401
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+291
−2
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| """ObservedData.""" | ||
|
|
||
| from connectors_sdk.models.associated_file import AssociatedFile | ||
| from connectors_sdk.models.base_identified_entity import BaseIdentifiedEntity | ||
| from pycti import ObservedData as PyctiObservedData | ||
| from pydantic import AwareDatetime, Field | ||
| from stix2.v21 import ObservedData as Stix2ObservedData | ||
|
|
||
|
|
||
| class ObservedData(BaseIdentifiedEntity): | ||
| """Base class for OpenCTI observed data.""" | ||
|
|
||
| first_observed: AwareDatetime = Field( | ||
| description="The beginning of the time window during which the data was seen.", | ||
| ) | ||
| last_observed: AwareDatetime = Field( | ||
| description="The end of the time window during which the data was seen.", | ||
| ) | ||
| number_observed: int = Field( | ||
| gt=0, | ||
| description="The number of times that each Cyber-observable object was observed.", | ||
| ) | ||
| entities: list[BaseIdentifiedEntity] = Field( | ||
| min_length=1, | ||
| description="List of OpenCTI identified entities observed.", | ||
| ) | ||
| labels: list[str] | None = Field( | ||
| default=None, | ||
| description="Labels of the observed data", | ||
| ) | ||
| associated_files: list[AssociatedFile] | None = Field( | ||
| default=None, | ||
| description="Files to upload with the observed data, e.g. observed data as a PDF.", | ||
| ) | ||
|
|
||
| def to_stix2_object(self) -> Stix2ObservedData: | ||
| """Make stix object.""" | ||
| object_refs = [obj.id for obj in self.entities] | ||
| return Stix2ObservedData( | ||
| id=PyctiObservedData.generate_id(object_refs), | ||
| first_observed=self.first_observed, | ||
| last_observed=self.last_observed, | ||
| number_observed=self.number_observed, | ||
| object_refs=object_refs, | ||
| labels=self.labels, | ||
| x_opencti_files=[ | ||
| file.to_stix2_object() for file in self.associated_files or [] | ||
| ], | ||
| allow_custom=True, | ||
| **self._common_stix2_properties() | ||
| ) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,237 @@ | ||
| """Tests for ObservedData model.""" | ||
|
|
||
| from datetime import datetime, timezone | ||
|
|
||
| import pytest | ||
| from connectors_sdk.models import URL, BaseIdentifiedEntity, IPV4Address, ObservedData | ||
| from pydantic import ValidationError | ||
| from stix2.v21 import ObservedData as Stix2ObservedData | ||
|
|
||
|
|
||
| def test_observed_data_is_a_base_identified_entity(): | ||
| """Test that ObservedData is a BaseIdentifiedEntity.""" | ||
| assert issubclass(ObservedData, BaseIdentifiedEntity) | ||
|
|
||
|
|
||
| def test_observed_data_requires_first_observed(): | ||
| """Test that ObservedData requires first_observed field.""" | ||
| input_data = { | ||
| "last_observed": datetime(2025, 1, 1, 13, 0, 0, tzinfo=timezone.utc), | ||
| "number_observed": 5, | ||
| "entities": [IPV4Address(value="1.1.1.1")], | ||
| } | ||
| with pytest.raises(ValidationError) as error: | ||
| ObservedData.model_validate(input_data) | ||
| assert "first_observed" in str(error.value) | ||
|
|
||
|
|
||
| def test_observed_data_requires_last_observed(): | ||
| """Test that ObservedData requires last_observed field.""" | ||
| input_data = { | ||
| "first_observed": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "number_observed": 5, | ||
| "entities": [IPV4Address(value="1.1.1.1")], | ||
| } | ||
| with pytest.raises(ValidationError) as error: | ||
| ObservedData.model_validate(input_data) | ||
| assert "last_observed" in str(error.value) | ||
|
|
||
|
|
||
| def test_observed_data_requires_number_observed(): | ||
| """Test that ObservedData requires number_observed field.""" | ||
| input_data = { | ||
| "first_observed": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "last_observed": datetime(2025, 1, 1, 13, 0, 0, tzinfo=timezone.utc), | ||
| "entities": [IPV4Address(value="1.1.1.1")], | ||
| } | ||
| with pytest.raises(ValidationError) as error: | ||
| ObservedData.model_validate(input_data) | ||
| assert "number_observed" in str(error.value) | ||
|
|
||
|
|
||
| def test_observed_data_requires_entities(): | ||
| """Test that ObservedData requires entities field.""" | ||
| input_data = { | ||
| "first_observed": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "last_observed": datetime(2025, 1, 1, 13, 0, 0, tzinfo=timezone.utc), | ||
| "number_observed": 5, | ||
| } | ||
| with pytest.raises(ValidationError) as error: | ||
| ObservedData.model_validate(input_data) | ||
| assert "entities" in str(error.value) | ||
|
|
||
|
|
||
| def test_observed_data_should_not_accept_empty_entities(): | ||
| """Test that ObservedData cannot be created with empty entities list.""" | ||
| input_data = { | ||
| "first_observed": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "last_observed": datetime(2025, 1, 1, 13, 0, 0, tzinfo=timezone.utc), | ||
| "number_observed": 5, | ||
| "entities": [], | ||
| } | ||
| with pytest.raises(ValidationError) as error: | ||
| ObservedData.model_validate(input_data) | ||
| assert "entities" in str(error.value) | ||
|
|
||
|
|
||
| def test_observed_data_should_not_accept_none_entities(): | ||
| """Test that ObservedData cannot be created with None entities.""" | ||
| # Given valid input data for ObservedData with entities = None | ||
| input_data = { | ||
| "first_observed": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "last_observed": datetime(2025, 1, 1, 13, 0, 0, tzinfo=timezone.utc), | ||
| "number_observed": 5, | ||
| "entities": None, | ||
| } | ||
| with pytest.raises(ValidationError) as error: | ||
| ObservedData.model_validate(input_data) | ||
| assert "entities" in str(error.value) | ||
|
|
||
|
|
||
| def test_observed_data_should_not_accept_invalid_input(): | ||
| """Test that ObservedData should not accept invalid input.""" | ||
| input_data = { | ||
| "first_observed": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "last_observed": datetime(2025, 1, 1, 13, 0, 0, tzinfo=timezone.utc), | ||
| "number_observed": 5, | ||
| "entities": [IPV4Address(value="1.1.1.1")], | ||
| "invalid_key": "invalid_value", | ||
| } | ||
| with pytest.raises(ValidationError) as error: | ||
| ObservedData.model_validate(input_data) | ||
| assert "invalid_key" in str(error.value) | ||
|
|
||
|
|
||
| def test_observed_data_should_not_accept_incoherent_dates(): | ||
| """Test that ObservedData should not accept incoherent dates.""" | ||
| input_data = { | ||
| "first_observed": datetime(2025, 1, 2, 12, 0, 0, tzinfo=timezone.utc), | ||
| "last_observed": datetime(2025, 1, 1, 13, 0, 0, tzinfo=timezone.utc), | ||
| "number_observed": 5, | ||
| "entities": [IPV4Address(value="1.1.1.1")], | ||
| } | ||
| with pytest.raises(ValidationError) as error: | ||
| ObservedData.model_validate(input_data) | ||
| assert ( | ||
| "'last_observed' must be greater than or equal to 'first_observed'" | ||
| in str(error.value) | ||
| ) | ||
|
|
||
|
|
||
| def test_observed_data_accepts_equal_dates(): | ||
| """Test that ObservedData accepts first_observed equal to last_observed.""" | ||
| input_data = { | ||
| "first_observed": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "last_observed": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "number_observed": 5, | ||
| "entities": [IPV4Address(value="1.1.1.1")], | ||
| } | ||
| observed_data = ObservedData.model_validate(input_data) | ||
| assert observed_data.first_observed == observed_data.last_observed | ||
|
|
||
|
|
||
| def test_observed_data_should_not_accept_zero_number_observed(): | ||
| """Test that ObservedData rejects zero number_observed.""" | ||
| input_data = { | ||
| "first_observed": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "last_observed": datetime(2025, 1, 1, 13, 0, 0, tzinfo=timezone.utc), | ||
| "number_observed": 0, | ||
| "entities": [IPV4Address(value="1.1.1.1")], | ||
| } | ||
| with pytest.raises(ValidationError) as error: | ||
| ObservedData.model_validate(input_data) | ||
| assert "Input should be greater than 0" in str(error.value) | ||
|
|
||
|
|
||
| def test_observed_data_should_not_accept_negative_number_observed(): | ||
| """Test that ObservedData rejects negative number_observed.""" | ||
| input_data = { | ||
| "first_observed": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "last_observed": datetime(2025, 1, 1, 13, 0, 0, tzinfo=timezone.utc), | ||
| "number_observed": -5, | ||
| "entities": [IPV4Address(value="1.1.1.1")], | ||
| } | ||
| with pytest.raises(ValidationError) as error: | ||
| ObservedData.model_validate(input_data) | ||
| assert "Input should be greater than 0" in str(error.value) | ||
|
|
||
|
|
||
| def test_observed_data_with_single_object(): | ||
| """Test that ObservedData can be created with a single object.""" | ||
| ipv4 = IPV4Address(value="192.168.1.1") | ||
| input_data = { | ||
| "first_observed": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "last_observed": datetime(2025, 1, 1, 13, 0, 0, tzinfo=timezone.utc), | ||
| "number_observed": 5, | ||
| "entities": [ipv4], | ||
| } | ||
| observed_data = ObservedData.model_validate(input_data) | ||
| assert len(observed_data.entities) == 1 | ||
| assert observed_data.entities[0].value == "192.168.1.1" | ||
|
|
||
|
|
||
| def test_observed_data_with_multiple_entities(): | ||
| """Test that ObservedData can be created with multiple entities.""" | ||
| ipv4 = IPV4Address(value="192.168.1.1") | ||
| url = URL(value="https://example.com") | ||
| input_data = { | ||
| "first_observed": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "last_observed": datetime(2025, 1, 1, 13, 0, 0, tzinfo=timezone.utc), | ||
| "number_observed": 5, | ||
| "entities": [ipv4, url], | ||
| } | ||
| observed_data = ObservedData.model_validate(input_data) | ||
| assert len(observed_data.entities) == 2 | ||
|
|
||
|
|
||
| def test_observed_data_to_stix2_object_returns_valid_stix_object( | ||
| fake_valid_organization_author, | ||
| fake_valid_external_references, | ||
| fake_valid_tlp_markings, | ||
| fake_valid_associated_files, | ||
| ): | ||
| """Test that ObservedData.to_stix2_object returns a valid STIX ObservedData.""" | ||
| input_data = { | ||
| "first_observed": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "last_observed": datetime(2025, 1, 1, 13, 0, 0, tzinfo=timezone.utc), | ||
| "number_observed": 5, | ||
| "entities": [IPV4Address(value="1.1.1.1")], | ||
| "author": fake_valid_organization_author, | ||
| "external_references": fake_valid_external_references, | ||
| "markings": fake_valid_tlp_markings, | ||
| "associated_files": fake_valid_associated_files, | ||
| "labels": ["test_label"], | ||
| } | ||
| observed_data = ObservedData.model_validate(input_data) | ||
|
|
||
| stix2_obj = observed_data.to_stix2_object() | ||
|
|
||
| assert isinstance(stix2_obj, Stix2ObservedData) | ||
| assert stix2_obj.first_observed == observed_data.first_observed | ||
| assert stix2_obj.last_observed == observed_data.last_observed | ||
| assert stix2_obj.number_observed == observed_data.number_observed | ||
|
|
||
|
|
||
| def test_observed_data_to_stix2_object_with_entities( | ||
| fake_valid_organization_author, | ||
| fake_valid_tlp_markings, | ||
| ): | ||
| """Test that ObservedData.to_stix2_object correctly includes object_refs.""" | ||
| ipv4 = IPV4Address(value="192.168.1.1") | ||
| url = URL(value="https://example.com") | ||
| input_data = { | ||
| "first_observed": datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc), | ||
| "last_observed": datetime(2025, 1, 1, 13, 0, 0, tzinfo=timezone.utc), | ||
| "number_observed": 5, | ||
| "entities": [ipv4, url], | ||
| "author": fake_valid_organization_author, | ||
| "markings": fake_valid_tlp_markings, | ||
| } | ||
| observed_data = ObservedData.model_validate(input_data) | ||
|
|
||
| stix2_obj = observed_data.to_stix2_object() | ||
|
|
||
| assert isinstance(stix2_obj, Stix2ObservedData) | ||
| assert len(stix2_obj.object_refs) == 2 | ||
| assert ipv4.id in stix2_obj.object_refs | ||
| assert url.id in stix2_obj.object_refs |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.