Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions end_to_end_tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import logging
import uuid
import json
import zipfile
import random
import os

Expand Down Expand Up @@ -613,5 +614,174 @@ def test_delete_timeline(self):
_ = sketch.lazyload_data(refresh_cache=True)
self.assertions.assertEqual(len(sketch.list_timelines()), 1)

def test_export_sketch(self):
"""Test exporting a sketch via the API client."""
# 1. Ensure the sketch has some data to export.

# create a new sketch
rand = random.randint(0, 10000)
sketch = self.api.create_sketch(
name=f"test_delete_timeline {rand}", description="test_delete_timeline"
)
self.sketch = sketch
file_path = (
"/usr/local/src/timesketch/end_to_end_tests/test_data/evtx_20250918.plaso"
)

self.import_timeline(file_path, sketch=sketch)

# 2. Call the export method on the sketch object.
export_file_path = "/tmp/export.zip"
exported_zip_file = self.sketch.export(export_file_path)

# 3. Verify the contents of the returned zip file.
self.assertions.assertTrue(
zipfile.is_zipfile(export_file_path), "Exported file is not a valid zip."
)

with zipfile.ZipFile(export_file_path, "r") as zipf:
# Check for expected files in the archive
self.assertions.assertIn("METADATA", zipf.namelist())
self.assertions.assertIn("events/starred_events.csv", zipf.namelist())

# Check the content of the metadata file
with zipf.open("METADATA") as meta_file:
metadata = json.loads(meta_file.read().decode("utf-8"))
self.assertions.assertEqual(metadata.get("sketch_id"), self.sketch.id)
self.assertions.assertEqual(
metadata.get("sketch_name"), self.sketch.name
)

def test_annotate_event(self):
"""Test annotating an event with a comment and a label."""
sketch = self.api.create_sketch(
name="test_annotate_event", description="A sketch for annotation testing"
)

# 1. Import a timeline with a known event.
self.import_timeline("sigma_events.csv", sketch=sketch)
_ = sketch.lazyload_data(refresh_cache=True)

# 2. Get an event to annotate.
search_client = search.Search(sketch)
search_client.query_string = 'source_short:"LOG"'
search_response = json.loads(search_client.json)
self.assertions.assertGreater(
len(search_response["objects"]), 0, "No events found to annotate"
)
event_to_annotate = search_response["objects"][0]
event_id = event_to_annotate["_id"]
index_id = event_to_annotate["_index"]

# 3. Add a comment and a label.
comment_text = "This is a test comment."
label_text = "test_label"
sketch.add_comment_to_event(event_id, index_id, comment_text)
sketch.add_label_to_event(event_id, index_id, label_text)

# 4. Retrieve the event again to verify the annotations.
# A short delay might be needed for the changes to be indexed.
time.sleep(2)
annotated_event_data = sketch.get_event(event_id, index_id)

# Verify the comment
comments = annotated_event_data.get("meta", {}).get("comments", [])
self.assertions.assertEqual(len(comments), 1)
self.assertions.assertEqual(comments[0]["comment"], comment_text)

# Verify the label
labels = annotated_event_data.get("objects", {}).get("label", [])
self.assertions.assertIn(
label_text, labels, "The test label was not found on the event."
)

def test_toggle_event_label(self):
"""Test toggling a label on an event."""
sketch = self.api.create_sketch(
name="test_toggle_event_label",
description="A sketch for label toggling",
)

# 1. Import a timeline with a known event.
self.import_timeline("sigma_events.csv", sketch=sketch)
_ = sketch.lazyload_data(refresh_cache=True)

# 2. Get an event to annotate.
search_client = search.Search(sketch)
search_client.query_string = 'source_short:"LOG"'
search_response = json.loads(search_client.json)
self.assertions.assertGreater(
len(search_response["objects"]), 0, "No events found to annotate"
)
event_to_annotate = search_response["objects"][0]
event_id = event_to_annotate["_id"]
index_id = event_to_annotate["_index"]
label_to_toggle = "__ts_star"

# 3. Add the label.
sketch.add_label_to_event(event_id, index_id, label_to_toggle)

# 4. Verify the label was added.
time.sleep(2) # Allow for indexing.
event_after_add = sketch.get_event(event_id, index_id)
labels_after_add = event_after_add.get("objects", {}).get("label", [])
self.assertions.assertIn(
label_to_toggle, labels_after_add, "The star label was not added."
)

# 5. Remove the label.
sketch.add_label_to_event(event_id, index_id, label_to_toggle, remove=True)

# 6. Verify the label was removed.
time.sleep(2) # Allow for indexing.
event_after_remove = sketch.get_event(event_id, index_id)
labels_after_remove = event_after_remove.get("objects", {}).get("label", [])
self.assertions.assertNotIn(
label_to_toggle,
labels_after_remove,
"The star label was not removed.",
)

def test_add_existing_star_label_bug(self):
"""Test adding an existing star label to an event.

This test is designed to replicate a bug where re-adding a special
label (like __ts_star) incorrectly removes it. The correct behavior
is for the label to remain. This test will fail until the bug is
fixed.
"""
sketch = self.api.create_sketch(
name="test_add_existing_star_label",
description="A sketch for testing adding existing labels",
)

# 1. Import a timeline.
self.import_timeline("sigma_events.csv", sketch=sketch)
_ = sketch.lazyload_data(refresh_cache=True)

# 2. Get an event to star.
search_client = search.Search(sketch)
search_client.query_string = 'source_short:"LOG"'
search_response = json.loads(search_client.json)
self.assertions.assertGreater(
len(search_response["objects"]), 0, "No events found to annotate"
)
event_to_annotate = search_response["objects"][0]
event_id = event_to_annotate["_id"]
index_id = event_to_annotate["_index"]
star_label = "__ts_star"

# 3. Add the star label for the first time.
sketch.add_label_to_event(event_id, index_id, star_label)

# 4. Add the same label again. This should be a no-op.
sketch.add_label_to_event(event_id, index_id, star_label, remove=False)

# 5. Verify the label still exists. This will fail with the buggy code.
time.sleep(2) # Allow for indexing.
event_after_second_add = sketch.get_event(event_id, index_id)
labels = event_after_second_add.get("objects", {}).get("label", [])
self.assertions.assertIn(star_label, labels, "The star label was removed.")


manager.EndToEndTestManager.register_test(ClientTest)
Loading
Loading