Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only wipe user content between tests #2871

Merged
merged 8 commits into from
Apr 11, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 3 additions & 291 deletions test_elasticsearch/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
from typing import Optional, Tuple

from elasticsearch import (
AuthorizationException,
ConnectionError,
Elasticsearch,
NotFoundError,
)

SOURCE_DIR = Path(__file__).absolute().parent.parent
Expand Down Expand Up @@ -118,40 +116,15 @@ def wipe_cluster(client):
except ImportError:
pass

is_xpack = True
if is_xpack:
wipe_rollup_jobs(client)
wait_for_pending_tasks(client, filter="xpack/rollup/job")
wipe_slm_policies(client)

# Searchable snapshot indices start in 7.8+
if es_version(client) >= (7, 8):
wipe_searchable_snapshot_indices(client)

wipe_snapshots(client)
if is_xpack:
wipe_data_streams(client)
wipe_data_streams(client)
wipe_indices(client)

if is_xpack:
wipe_xpack_templates(client)
else:
client.indices.delete_template(name="*")
client.indices.delete_index_template(name="*")
client.cluster.delete_component_template(name="*")
client.indices.delete_template(name="*")
client.indices.delete_index_template(name="*")

wipe_cluster_settings(client)

if is_xpack:
wipe_ilm_policies(client)
wipe_auto_follow_patterns(client)
wipe_tasks(client)
wipe_node_shutdown_metadata(client)
wait_for_pending_datafeeds_and_jobs(client)
wipe_calendars(client)
wipe_filters(client)
wipe_transforms(client)

wait_for_cluster_state_updates_to_finish(client)
if close_after_wipe:
client.close()
Expand All @@ -169,16 +142,6 @@ def wipe_cluster_settings(client):
client.cluster.put_settings(body=new_settings)


def wipe_rollup_jobs(client):
rollup_jobs = client.rollup.get_jobs(id="_all").get("jobs", ())
for job in rollup_jobs:
job_id = job["config"]["id"]
client.options(ignore_status=404).rollup.stop_job(
id=job_id, wait_for_completion=True
)
client.options(ignore_status=404).rollup.delete_job(id=job_id)


def wipe_snapshots(client):
"""Deletes all the snapshots and repositories from the cluster"""
in_progress_snapshots = []
Expand Down Expand Up @@ -223,259 +186,8 @@ def wipe_indices(client):
)


def wipe_searchable_snapshot_indices(client):
cluster_metadata = client.cluster.state(
metric="metadata",
filter_path="metadata.indices.*.settings.index.store.snapshot",
)
if cluster_metadata:
for index in cluster_metadata["metadata"]["indices"].keys():
client.indices.delete(index=index)


def wipe_xpack_templates(client):
# Delete index templates (including legacy)
templates = [
x.strip() for x in client.cat.templates(h="name").split("\n") if x.strip()
]
for template in templates:
if is_xpack_template(template):
continue
try:
client.indices.delete_template(name=template)
except NotFoundError as e:
if f"index_template [{template}] missing" in str(e):
client.indices.delete_index_template(name=template)

# Delete component templates
templates = client.cluster.get_component_template()["component_templates"]
templates_to_delete = [
template["name"]
for template in templates
if not is_xpack_template(template["name"])
]
if templates_to_delete:
client.cluster.delete_component_template(name=",".join(templates_to_delete))


def wipe_ilm_policies(client):
for policy in client.ilm.get_lifecycle():
if (
policy
not in {
"ilm-history-ilm-policy",
"slm-history-ilm-policy",
"watch-history-ilm-policy",
"watch-history-ilm-policy-16",
"ml-size-based-ilm-policy",
"logs",
"metrics",
"synthetics",
"7-days-default",
"30-days-default",
"90-days-default",
"180-days-default",
"365-days-default",
".fleet-actions-results-ilm-policy",
".deprecation-indexing-ilm-policy",
".monitoring-8-ilm-policy",
}
and "-history-ilm-polcy" not in policy
and "-meta-ilm-policy" not in policy
and "-data-ilm-policy" not in policy
and "@lifecycle" not in policy
):
client.ilm.delete_lifecycle(name=policy)


def wipe_slm_policies(client):
policies = client.slm.get_lifecycle()
for policy in policies:
if policy not in {"cloud-snapshot-policy"}:
client.slm.delete_lifecycle(policy_id=policy)


def wipe_auto_follow_patterns(client):
for pattern in client.ccr.get_auto_follow_pattern()["patterns"]:
client.ccr.delete_auto_follow_pattern(name=pattern["name"])


def wipe_node_shutdown_metadata(client):
try:
shutdown_status = client.shutdown.get_node()
# If response contains these two keys the feature flag isn't enabled
# on this cluster so skip this step now.
if "_nodes" in shutdown_status and "cluster_name" in shutdown_status:
return

for shutdown_node in shutdown_status.get("nodes", []):
node_id = shutdown_node["node_id"]
client.shutdown.delete_node(node_id=node_id)

# Elastic Cloud doesn't allow this so we skip.
except AuthorizationException:
pass


def wipe_tasks(client):
tasks = client.tasks.list()
for node_name, node in tasks.get("node", {}).items():
for task_id in node.get("tasks", ()):
client.tasks.cancel(task_id=task_id, wait_for_completion=True)


def wait_for_pending_tasks(client, filter, timeout=30):
end_time = time.time() + timeout
while time.time() < end_time:
tasks = client.cat.tasks(detailed=True).split("\n")
if not any(filter in task for task in tasks):
break


def wait_for_pending_datafeeds_and_jobs(client: Elasticsearch, timeout=30):
end_time = time.time() + timeout
while time.time() < end_time:
resp = client.ml.get_datafeeds(datafeed_id="*", allow_no_match=True)
if resp["count"] == 0:
break
for datafeed in resp["datafeeds"]:
client.options(ignore_status=404).ml.delete_datafeed(
datafeed_id=datafeed["datafeed_id"]
)

end_time = time.time() + timeout
while time.time() < end_time:
resp = client.ml.get_jobs(job_id="*", allow_no_match=True)
if resp["count"] == 0:
break
for job in resp["jobs"]:
client.options(ignore_status=404).ml.close_job(job_id=job["job_id"])
client.options(ignore_status=404).ml.delete_job(job_id=job["job_id"])

end_time = time.time() + timeout
while time.time() < end_time:
resp = client.ml.get_data_frame_analytics(id="*")
if resp["count"] == 0:
break
for job in resp["data_frame_analytics"]:
client.options(ignore_status=404).ml.stop_data_frame_analytics(id=job["id"])
client.options(ignore_status=404).ml.delete_data_frame_analytics(
id=job["id"]
)


def wipe_filters(client: Elasticsearch, timeout=30):
end_time = time.time() + timeout
while time.time() < end_time:
resp = client.ml.get_filters(filter_id="*")
if resp["count"] == 0:
break
for filter in resp["filters"]:
client.options(ignore_status=404).ml.delete_filter(
filter_id=filter["filter_id"]
)


def wipe_calendars(client: Elasticsearch, timeout=30):
end_time = time.time() + timeout
while time.time() < end_time:
resp = client.ml.get_calendars(calendar_id="*")
if resp["count"] == 0:
break
for calendar in resp["calendars"]:
client.options(ignore_status=404).ml.delete_calendar(
calendar_id=calendar["calendar_id"]
)


def wipe_transforms(client: Elasticsearch, timeout=30):
end_time = time.time() + timeout
while time.time() < end_time:
resp = client.transform.get_transform(transform_id="*")
if resp["count"] == 0:
break
for trasnform in resp["transforms"]:
client.options(ignore_status=404).transform.stop_transform(
transform_id=trasnform["id"]
)
client.options(ignore_status=404).transform.delete_transform(
transform_id=trasnform["id"]
)


def wait_for_cluster_state_updates_to_finish(client, timeout=30):
end_time = time.time() + timeout
while time.time() < end_time:
if not client.cluster.pending_tasks().get("tasks", ()):
break


def is_xpack_template(name):
if name.startswith("."):
return True
elif name.startswith("behavioral_analytics-events"):
return True
elif name.startswith("elastic-connectors-"):
return True
elif name.startswith("entities_v1_"):
return True
elif name.endswith("@ilm"):
return True
elif name.endswith("@template"):
return True

return name in {
"agentless",
"agentless@mappings",
"agentless@settings",
"apm-10d@lifecycle",
"apm-180d@lifecycle",
"apm-390d@lifecycle",
"apm-90d@lifecycle",
"apm@mappings",
"apm@settings",
"data-streams-mappings",
"data-streams@mappings",
"elastic-connectors",
"ecs@dynamic_templates",
"ecs@mappings",
"ilm-history-7",
"kibana-reporting@settings",
"logs",
"logs-apm.error@mappings",
"logs-apm@settings",
"logs-mappings",
"logs@mappings",
"logs-settings",
"logs@settings",
"metrics",
"metrics-apm@mappings",
"metrics-apm.service_destination@mappings",
"metrics-apm.service_summary@mappings",
"metrics-apm.service_transaction@mappings",
"metrics-apm@settings",
"metrics-apm.transaction@mappings",
"metrics-mappings",
"metrics@mappings",
"metrics-settings",
"metrics@settings",
"metrics-tsdb-settings",
"metrics@tsdb-settings",
"search-acl-filter",
"synthetics",
"synthetics-mappings",
"synthetics@mappings",
"synthetics-settings",
"synthetics@settings",
"traces-apm@mappings",
"traces-apm.rum@mappings",
"traces@mappings",
"traces@settings",
# otel
"metrics-otel@mappings",
"semconv-resource-to-ecs@mappings",
"traces-otel@mappings",
"ecs-tsdb@mappings",
"logs-otel@mappings",
"otel@mappings",
}
Loading