Skip to content

Commit 5ef20e2

Browse files
Add konflux_utils module to monitor Konflux Pipelineruns
CLOUDDST-28645 Signed-off-by: Yashvardhan Nanavati <[email protected]> Assisted-by: Cursor Signed-off-by: Yashvardhan Nanavati <[email protected]>
1 parent 9d0f7bd commit 5ef20e2

File tree

5 files changed

+763
-4
lines changed

5 files changed

+763
-4
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,15 @@ The custom configuration options for the Celery workers are listed below:
446446
* `iib_ocp_opm_mapping` - the dictionary mapping of OCP version to OPM version
447447
indicating the OPM version to be used for the corresponding OCP version like
448448
`{"v4.15": "opm-v1.28.0"}`
449+
* `iib_konflux_cluster_url` - the URL of the Konflux OpenShift cluster to access for Tekton PipelineRuns
450+
(e.g. `https://api.konflux.example.com:6443`). This is required for cross-cluster access to Konflux.
451+
* `iib_konflux_cluster_token` - the authentication token for accessing the Konflux OpenShift cluster.
452+
This should be a service account token with appropriate permissions to access Tekton PipelineRuns.
453+
* `iib_konflux_cluster_ca_cert` - the CA certificate for the Konflux OpenShift cluster. This can be
454+
either a file path to the certificate or the certificate content as a string. This is required
455+
for secure cross-cluster access.
456+
* `iib_konflux_namespace` - the namespace in the Konflux cluster where Tekton PipelineRuns are located.
457+
This defaults to `iib-tenant`.
449458

450459

451460
If you wish to configure AWS S3 bucket for storing artifact files, the following **environment variables**

iib/workers/config.py

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ class Config(object):
127127
# The minimal version of OPM which requires setting the --migrate-level flag for migrate
128128
iib_opm_new_migrate_version = "v1.46.0"
129129

130+
# Konflux configuration for cross-cluster access
131+
iib_konflux_cluster_url: Optional[str] = None
132+
iib_konflux_cluster_token: Optional[str] = None
133+
iib_konflux_cluster_ca_cert: Optional[str] = None
134+
iib_konflux_namespace: str = 'iib-tenant'
135+
130136

131137
class ProductionConfig(Config):
132138
"""The production IIB Celery configuration."""
@@ -228,9 +234,9 @@ class DevelopmentConfig(Config):
228234
iib_registry: str = 'registry:8443'
229235
iib_request_logs_dir: Optional[str] = '/var/log/iib/requests'
230236
iib_request_related_bundles_dir: Optional[str] = '/var/lib/requests/related_bundles'
231-
iib_request_recursive_related_bundles_dir: Optional[
232-
str
233-
] = '/var/lib/requests/recursive_related_bundles'
237+
iib_request_recursive_related_bundles_dir: Optional[str] = (
238+
'/var/lib/requests/recursive_related_bundles'
239+
)
234240
iib_dogpile_backend: str = 'dogpile.cache.memcached'
235241
iib_ocp_opm_mapping: dict = {
236242
"v4.6": "opm-v1.26.4",
@@ -326,6 +332,7 @@ def validate_celery_config(conf: app.utils.Settings, **kwargs) -> None:
326332

327333
_validate_multiple_opm_mapping(conf['iib_ocp_opm_mapping'])
328334
_validate_iib_org_customizations(conf['iib_organization_customizations'])
335+
_validate_konflux_config(conf)
329336

330337
if conf.get('iib_aws_s3_bucket_name'):
331338
if not isinstance(conf['iib_aws_s3_bucket_name'], str):
@@ -481,6 +488,46 @@ def _validate_iib_org_customizations(
481488
)
482489

483490

491+
def _validate_konflux_config(conf: app.utils.Settings) -> None:
492+
"""
493+
Validate Konflux configuration variables.
494+
495+
:param celery.app.utils.Settings conf: the Celery application configuration to validate
496+
:raises iib.exceptions.ConfigError: if the configuration is invalid
497+
"""
498+
# Check if any Konflux configuration is provided
499+
konflux_url = conf.get('iib_konflux_cluster_url')
500+
konflux_token = conf.get('iib_konflux_cluster_token')
501+
konflux_ca_cert = conf.get('iib_konflux_cluster_ca_cert')
502+
503+
# If any Konflux config is provided, all required fields must be set
504+
if any([konflux_url, konflux_token, konflux_ca_cert]):
505+
if not konflux_url:
506+
raise ConfigError(
507+
'iib_konflux_cluster_url must be set when using Konflux configuration'
508+
)
509+
if not konflux_token:
510+
raise ConfigError(
511+
'iib_konflux_cluster_token must be set when using Konflux configuration'
512+
)
513+
if not konflux_ca_cert:
514+
raise ConfigError(
515+
'iib_konflux_cluster_ca_cert must be set when using Konflux configuration'
516+
)
517+
518+
# Validate URL format
519+
if not isinstance(konflux_url, str) or not konflux_url.startswith('https://'):
520+
raise ConfigError('iib_konflux_cluster_url must be a valid HTTPS URL')
521+
522+
# Validate token is a string
523+
if not isinstance(konflux_token, str):
524+
raise ConfigError('iib_konflux_cluster_token must be a string')
525+
526+
# Validate CA cert is a string
527+
if not isinstance(konflux_ca_cert, str):
528+
raise ConfigError('iib_konflux_cluster_ca_cert must be a string')
529+
530+
484531
def get_worker_config() -> app.utils.Settings:
485532
"""Return the Celery configuration."""
486533
# Import this here to avoid a circular import

iib/workers/tasks/konflux_utils.py

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
# SPDX-License-Identifier: GPL-3.0-or-later
2+
"""
3+
Konflux utilities for interacting with Tekton pipelineruns in OpenShift clusters.
4+
5+
This module provides functions to find and monitor Konflux pipelineruns triggered by git commits.
6+
It supports cross-cluster access to OpenShift/Kubernetes clusters via IIB Worker configuration.
7+
"""
8+
import logging
9+
import os
10+
import time
11+
from typing import List, Dict, Any, Optional
12+
13+
from kubernetes import client
14+
from kubernetes.client.rest import ApiException
15+
16+
from iib.exceptions import IIBError
17+
from iib.workers.config import get_worker_config
18+
19+
__all__ = ['find_pipelinerun', 'wait_for_pipeline_completion']
20+
21+
log = logging.getLogger(__name__)
22+
23+
# Global variables for Kubernetes client and configuration
24+
_v1_client: Optional[client.CustomObjectsApi] = None
25+
26+
27+
def _get_kubernetes_client() -> client.CustomObjectsApi:
28+
"""
29+
Get or create a Kubernetes CustomObjectsApi client for cross-cluster access.
30+
31+
:return: Configured Kubernetes CustomObjectsApi client
32+
:raises IIBError: If unable to create Kubernetes client or configuration is missing
33+
"""
34+
global _v1_client
35+
36+
if _v1_client is not None:
37+
return _v1_client
38+
39+
try:
40+
# Get configuration from IIB worker config
41+
worker_config = get_worker_config()
42+
43+
# Get cross-cluster configuration (validation is done in config.py)
44+
target_cluster_url = getattr(worker_config, 'iib_konflux_cluster_url', None)
45+
target_cluster_token = getattr(worker_config, 'iib_konflux_cluster_token', None)
46+
target_cluster_ca_cert = getattr(worker_config, 'iib_konflux_cluster_ca_cert', None)
47+
48+
# If no Konflux configuration is provided, raise an error
49+
if not target_cluster_url or not target_cluster_token or not target_cluster_ca_cert:
50+
raise IIBError(
51+
"Konflux configuration is required. Please set "
52+
"iib_konflux_cluster_url, iib_konflux_cluster_token, and "
53+
"iib_konflux_cluster_ca_cert in IIB worker configuration."
54+
)
55+
56+
log.info(
57+
f"Configuring Kubernetes client for cross-cluster access to {target_cluster_url}"
58+
)
59+
60+
configuration = client.Configuration()
61+
configuration.host = target_cluster_url
62+
configuration.api_key_prefix['authorization'] = 'Bearer'
63+
configuration.api_key['authorization'] = target_cluster_token
64+
65+
# If CA cert is provided as a string, write it to a temp file
66+
if not os.path.isfile(target_cluster_ca_cert):
67+
import tempfile
68+
69+
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.crt') as f:
70+
f.write(target_cluster_ca_cert)
71+
target_cluster_ca_cert = f.name
72+
73+
configuration.ssl_ca_cert = target_cluster_ca_cert
74+
75+
_v1_client = client.CustomObjectsApi(client.ApiClient(configuration))
76+
return _v1_client
77+
78+
except IIBError:
79+
# Re-raise IIBError as-is (like CA certificate requirement)
80+
raise
81+
except Exception as e:
82+
# Log error without exposing sensitive information
83+
log.error(f"Failed to initialize Kubernetes client: {type(e).__name__}")
84+
raise IIBError(
85+
f"Failed to initialize Kubernetes client: {type(e).__name__}"
86+
)
87+
88+
89+
def find_pipelinerun(commit_sha: str) -> List[Dict[str, Any]]:
90+
"""
91+
Find the Konflux pipelinerun triggered by the git commit.
92+
93+
:param str commit_sha: The git commit SHA to search for
94+
:return: List of pipelinerun objects matching the commit SHA
95+
:rtype: List[Dict[str, Any]]
96+
:raises IIBError: If there's an error fetching pipelineruns
97+
"""
98+
try:
99+
log.info(f"Searching for pipelineruns with commit SHA: {commit_sha}")
100+
101+
v1_client = _get_kubernetes_client()
102+
worker_config = get_worker_config()
103+
namespace = getattr(worker_config, 'iib_konflux_namespace', 'iib-tenant')
104+
105+
runs = v1_client.list_namespaced_custom_object(
106+
group="tekton.dev",
107+
version="v1",
108+
namespace=namespace,
109+
plural="pipelineruns",
110+
label_selector=f"pipelinesascode.tekton.dev/sha={commit_sha}",
111+
)
112+
113+
items = runs.get("items", [])
114+
log.info(f"Found {len(items)} pipelinerun(s) for commit {commit_sha}")
115+
116+
return items
117+
118+
except ApiException as e:
119+
log.error(f"Kubernetes API error while fetching pipelineruns: {e.status} - {e.reason}")
120+
raise IIBError(
121+
f"Failed to fetch pipelineruns for commit {commit_sha}: API error {e.status}"
122+
)
123+
except Exception as e:
124+
log.error(f"Unexpected error while fetching pipelineruns: {type(e).__name__}")
125+
raise IIBError(
126+
f"Unexpected error while fetching pipelineruns for commit {commit_sha}: "
127+
f"{type(e).__name__}"
128+
)
129+
130+
131+
def wait_for_pipeline_completion(pipelinerun_name: str, timeout: int = 1800) -> None:
132+
"""
133+
Poll the status of a tekton Pipelinerun and wait for completion.
134+
135+
Handles all Tekton PipelineRun status reasons:
136+
- Success: Succeeded, Completed
137+
- Failure: Failed, PipelineRunTimeout, CreateRunFailed, status=False
138+
- Cancellation: Cancelled
139+
140+
:param str pipelinerun_name: Name of the pipelinerun to monitor
141+
:param int timeout: Maximum time to wait in seconds (default: 1800 = 30 mins)
142+
:raises IIBError: If the pipelinerun fails, is cancelled, or times out
143+
"""
144+
log.info(f"Starting to monitor pipelinerun: {pipelinerun_name}")
145+
start_time = time.time()
146+
147+
while True:
148+
try:
149+
# Check if we've exceeded the timeout
150+
elapsed_time = time.time() - start_time
151+
if elapsed_time > timeout:
152+
raise IIBError(
153+
f"Timeout waiting for pipelinerun {pipelinerun_name} to complete "
154+
f"after {timeout} seconds"
155+
)
156+
157+
# Fetch the current status of the pipelinerun
158+
v1_client = _get_kubernetes_client()
159+
worker_config = get_worker_config()
160+
namespace = getattr(worker_config, 'iib_konflux_namespace', 'iib-tenant')
161+
162+
run = v1_client.get_namespaced_custom_object(
163+
group="tekton.dev",
164+
version="v1",
165+
namespace=namespace,
166+
plural="pipelineruns",
167+
name=pipelinerun_name,
168+
)
169+
170+
# Extract status information
171+
status = run.get("status", {})
172+
conditions = status.get("conditions", [])
173+
174+
if not conditions:
175+
log.info(f"Pipelinerun {pipelinerun_name} is still initializing...")
176+
time.sleep(30)
177+
continue
178+
179+
# Get the condition (there's typically only one condition object for PipelineRuns)
180+
condition = conditions[0] if conditions else {}
181+
reason = condition.get("reason", "Unknown")
182+
condition_type = condition.get("type", "Unknown")
183+
status = condition.get("status", "Unknown")
184+
message = condition.get("message", "")
185+
186+
log.info(
187+
f"Pipelinerun {pipelinerun_name} status: reason={reason}, "
188+
f"type={condition_type}, status={status}"
189+
)
190+
if message:
191+
log.info(f"Pipelinerun {pipelinerun_name} message: {message}")
192+
193+
# Check if the pipelinerun has completed based on Tekton status documentation
194+
# https://tekton.dev/docs/pipelines/pipelineruns/#monitoring-execution-status
195+
# Success cases
196+
if reason in ("Succeeded", "Completed"):
197+
log.info(f"Pipelinerun {pipelinerun_name} completed successfully")
198+
return
199+
200+
# Failure cases
201+
elif reason in ("Failed", "PipelineRunTimeout", "CreateRunFailed"):
202+
error_msg = f"Pipelinerun {pipelinerun_name} failed"
203+
if reason == "PipelineRunTimeout":
204+
error_msg += " due to timeout"
205+
elif reason == "CreateRunFailed":
206+
error_msg += " due to resource creation failure"
207+
elif message:
208+
error_msg += f": {message}"
209+
raise IIBError(error_msg)
210+
211+
# Cancellation cases
212+
elif reason == "Cancelled":
213+
raise IIBError(f"Pipelinerun {pipelinerun_name} was cancelled")
214+
215+
# Check for error status (False status indicates failure)
216+
elif status == "False":
217+
error_msg = f"Pipelinerun {pipelinerun_name} failed"
218+
if message:
219+
error_msg += f": {message}"
220+
raise IIBError(error_msg)
221+
222+
# Still running, wait before next check
223+
log.info(
224+
f"Pipelinerun {pipelinerun_name} is still running... (reason: {reason})"
225+
)
226+
time.sleep(30)
227+
228+
except ApiException as e:
229+
log.error(
230+
f"Kubernetes API error while monitoring pipelinerun {pipelinerun_name}: "
231+
f"{e.status} - {e.reason}"
232+
)
233+
raise IIBError(
234+
f"Failed to monitor pipelinerun {pipelinerun_name}: API error {e.status}"
235+
)
236+
except IIBError:
237+
# Re-raise IIBError as-is
238+
raise
239+
except Exception as e:
240+
log.error(
241+
f"Unexpected error while monitoring pipelinerun {pipelinerun_name}: "
242+
f"{type(e).__name__}"
243+
)
244+
raise IIBError(
245+
f"Unexpected error while monitoring pipelinerun {pipelinerun_name}: "
246+
f"{type(e).__name__}"
247+
)

0 commit comments

Comments
 (0)