-
Notifications
You must be signed in to change notification settings - Fork 25
Add konflux_utils module to monitor Konflux Pipelineruns #1181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer's GuideImplement cross-cluster monitoring of Tekton PipelineRuns in Konflux clusters by introducing a new konflux_utils module that reads and validates Celery config options for Konflux, initializes a Kubernetes client for cross-cluster access, and provides functions to find and poll PipelineRuns, with accompanying documentation updates and comprehensive unit tests. Entity relationship diagram for new Konflux config optionserDiagram
CONFIG {
string iib_konflux_cluster_url
string iib_konflux_cluster_token
string iib_konflux_cluster_ca_cert
string iib_konflux_namespace
}
CONFIG ||--o| KUBERNETES_CLIENT : provides
KUBERNETES_CLIENT {
string host
string api_key
string ssl_ca_cert
}
Class diagram for new konflux_utils moduleclassDiagram
class konflux_utils {
+find_pipelinerun(commit_sha: str) List[Dict[str, Any]]
+wait_for_pipeline_completion(pipelinerun_name: str, timeout: int = 1800) None
-_get_kubernetes_client() client.CustomObjectsApi
-_v1_client: Optional[client.CustomObjectsApi]
}
konflux_utils --> "1" get_worker_config : uses
konflux_utils --> "1" client.CustomObjectsApi : creates
konflux_utils --> "1" IIBError : raises
konflux_utils --> "1" ApiException : handles
konflux_utils --> "1" logging : logs
konflux_utils --> "1" os : uses
konflux_utils --> "1" time : uses
konflux_utils --> "1" tempfile : uses (if needed)
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
e6f7ecb
to
5ef20e2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Remove or relocate the unreachable
log.info
call in_get_kubernetes_client
(it comes right after raising an IIBError). - Refactor the repeated pattern of fetching
get_worker_config()
and namespace infind_pipelinerun
andwait_for_pipeline_completion
into a shared helper to reduce duplication. - Avoid leaking temp files when writing the CA cert—either clean up the file after use or use
NamedTemporaryFile(delete=True)
.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Remove or relocate the unreachable `log.info` call in `_get_kubernetes_client` (it comes right after raising an IIBError).
- Refactor the repeated pattern of fetching `get_worker_config()` and namespace in `find_pipelinerun` and `wait_for_pipeline_completion` into a shared helper to reduce duplication.
- Avoid leaking temp files when writing the CA cert—either clean up the file after use or use `NamedTemporaryFile(delete=True)`.
## Individual Comments
### Comment 1
<location> `iib/workers/tasks/konflux_utils.py:56-57` </location>
<code_context>
+ "iib_konflux_cluster_ca_cert in IIB worker configuration."
+ )
+
+ log.info(
+ f"Configuring Kubernetes client for cross-cluster access to {target_cluster_url}"
+ )
+
</code_context>
<issue_to_address>
**issue:** log.info is unreachable due to preceding raise statement.
Move the log.info statement before the raise to ensure the log message is recorded.
</issue_to_address>
### Comment 2
<location> `iib/workers/tasks/konflux_utils.py:179-184` </location>
<code_context>
+ continue
+
+ # Get the condition (there's typically only one condition object for PipelineRuns)
+ condition = conditions[0] if conditions else {}
+ reason = condition.get("reason", "Unknown")
+ condition_type = condition.get("type", "Unknown")
</code_context>
<issue_to_address>
**suggestion:** Assumes only one condition object; Tekton may provide multiple.
Instead of checking only the first condition, iterate through all conditions to find the one with type == 'Succeeded'.
```suggestion
# Find the condition with type == 'Succeeded'
condition = next((c for c in conditions if c.get("type") == "Succeeded"), {})
reason = condition.get("reason", "Unknown")
condition_type = condition.get("type", "Unknown")
status = condition.get("status", "Unknown")
message = condition.get("message", "")
```
</issue_to_address>
### Comment 3
<location> `tests/test_workers/test_tasks/test_konflux_utils.py:322-336` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Avoid conditionals in tests. ([`no-conditionals-in-tests`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/no-conditionals-in-tests))
<details><summary>Explanation</summary>Avoid complex code, like conditionals, in test functions.
Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:
* loops
* conditionals
Some ways to fix this:
* Use parametrized tests to get rid of the loop.
* Move the complex logic into helpers.
* Move the complex part into pytest fixtures.
> Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.
Software Engineering at Google / [Don't Put Logic in Tests](https://abseil.io/resources/swe-book/html/ch12.html#donapostrophet_put_logic_in_tests)
</details>
</issue_to_address>
### Comment 4
<location> `iib/workers/config.py:505` </location>
<code_context>
def _validate_konflux_config(conf: app.utils.Settings) -> None:
"""
Validate Konflux configuration variables.
:param celery.app.utils.Settings conf: the Celery application configuration to validate
:raises iib.exceptions.ConfigError: if the configuration is invalid
"""
# Check if any Konflux configuration is provided
konflux_url = conf.get('iib_konflux_cluster_url')
konflux_token = conf.get('iib_konflux_cluster_token')
konflux_ca_cert = conf.get('iib_konflux_cluster_ca_cert')
# If any Konflux config is provided, all required fields must be set
if any([konflux_url, konflux_token, konflux_ca_cert]):
if not konflux_url:
raise ConfigError(
'iib_konflux_cluster_url must be set when using Konflux configuration'
)
if not konflux_token:
raise ConfigError(
'iib_konflux_cluster_token must be set when using Konflux configuration'
)
if not konflux_ca_cert:
raise ConfigError(
'iib_konflux_cluster_ca_cert must be set when using Konflux configuration'
)
# Validate URL format
if not isinstance(konflux_url, str) or not konflux_url.startswith('https://'):
raise ConfigError('iib_konflux_cluster_url must be a valid HTTPS URL')
# Validate token is a string
if not isinstance(konflux_token, str):
raise ConfigError('iib_konflux_cluster_token must be a string')
# Validate CA cert is a string
if not isinstance(konflux_ca_cert, str):
raise ConfigError('iib_konflux_cluster_ca_cert must be a string')
</code_context>
<issue_to_address>
**issue (code-quality):** Extract code out into function ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
</issue_to_address>
### Comment 5
<location> `iib/workers/tasks/konflux_utils.py:39` </location>
<code_context>
def _get_kubernetes_client() -> client.CustomObjectsApi:
"""
Get or create a Kubernetes CustomObjectsApi client for cross-cluster access.
:return: Configured Kubernetes CustomObjectsApi client
:raises IIBError: If unable to create Kubernetes client or configuration is missing
"""
global _v1_client
if _v1_client is not None:
return _v1_client
try:
# Get configuration from IIB worker config
worker_config = get_worker_config()
# Get cross-cluster configuration (validation is done in config.py)
target_cluster_url = getattr(worker_config, 'iib_konflux_cluster_url', None)
target_cluster_token = getattr(worker_config, 'iib_konflux_cluster_token', None)
target_cluster_ca_cert = getattr(worker_config, 'iib_konflux_cluster_ca_cert', None)
# If no Konflux configuration is provided, raise an error
if not target_cluster_url or not target_cluster_token or not target_cluster_ca_cert:
raise IIBError(
"Konflux configuration is required. Please set "
"iib_konflux_cluster_url, iib_konflux_cluster_token, and "
"iib_konflux_cluster_ca_cert in IIB worker configuration."
)
log.info(
f"Configuring Kubernetes client for cross-cluster access to {target_cluster_url}"
)
configuration = client.Configuration()
configuration.host = target_cluster_url
configuration.api_key_prefix['authorization'] = 'Bearer'
configuration.api_key['authorization'] = target_cluster_token
# If CA cert is provided as a string, write it to a temp file
if not os.path.isfile(target_cluster_ca_cert):
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.crt') as f:
f.write(target_cluster_ca_cert)
target_cluster_ca_cert = f.name
configuration.ssl_ca_cert = target_cluster_ca_cert
_v1_client = client.CustomObjectsApi(client.ApiClient(configuration))
return _v1_client
except IIBError:
# Re-raise IIBError as-is (like CA certificate requirement)
raise
except Exception as e:
# Log error without exposing sensitive information
log.error(f"Failed to initialize Kubernetes client: {type(e).__name__}")
raise IIBError(
f"Failed to initialize Kubernetes client: {type(e).__name__}"
)
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Extract code out into function ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
- Remove unreachable code ([`remove-unreachable-code`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-unreachable-code/))
- Inline variable that is immediately returned ([`inline-immediately-returned-variable`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/inline-immediately-returned-variable/))
- Explicitly raise from a previous error ([`raise-from-previous-error`](https://docs.sourcery.ai/Reference/Default-Rules/suggestions/raise-from-previous-error/))
</issue_to_address>
### Comment 6
<location> `iib/workers/tasks/konflux_utils.py:98` </location>
<code_context>
def find_pipelinerun(commit_sha: str) -> List[Dict[str, Any]]:
"""
Find the Konflux pipelinerun triggered by the git commit.
:param str commit_sha: The git commit SHA to search for
:return: List of pipelinerun objects matching the commit SHA
:rtype: List[Dict[str, Any]]
:raises IIBError: If there's an error fetching pipelineruns
"""
try:
log.info(f"Searching for pipelineruns with commit SHA: {commit_sha}")
v1_client = _get_kubernetes_client()
worker_config = get_worker_config()
namespace = getattr(worker_config, 'iib_konflux_namespace', 'iib-tenant')
runs = v1_client.list_namespaced_custom_object(
group="tekton.dev",
version="v1",
namespace=namespace,
plural="pipelineruns",
label_selector=f"pipelinesascode.tekton.dev/sha={commit_sha}",
)
items = runs.get("items", [])
log.info(f"Found {len(items)} pipelinerun(s) for commit {commit_sha}")
return items
except ApiException as e:
log.error(f"Kubernetes API error while fetching pipelineruns: {e.status} - {e.reason}")
raise IIBError(
f"Failed to fetch pipelineruns for commit {commit_sha}: API error {e.status}"
)
except Exception as e:
log.error(f"Unexpected error while fetching pipelineruns: {type(e).__name__}")
raise IIBError(
f"Unexpected error while fetching pipelineruns for commit {commit_sha}: "
f"{type(e).__name__}"
)
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Extract code out into function ([`extract-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-method/))
- Explicitly raise from a previous error [×2] ([`raise-from-previous-error`](https://docs.sourcery.ai/Reference/Default-Rules/suggestions/raise-from-previous-error/))
</issue_to_address>
### Comment 7
<location> `iib/workers/tasks/konflux_utils.py:131` </location>
<code_context>
def wait_for_pipeline_completion(pipelinerun_name: str, timeout: int = 1800) -> None:
"""
Poll the status of a tekton Pipelinerun and wait for completion.
Handles all Tekton PipelineRun status reasons:
- Success: Succeeded, Completed
- Failure: Failed, PipelineRunTimeout, CreateRunFailed, status=False
- Cancellation: Cancelled
:param str pipelinerun_name: Name of the pipelinerun to monitor
:param int timeout: Maximum time to wait in seconds (default: 1800 = 30 mins)
:raises IIBError: If the pipelinerun fails, is cancelled, or times out
"""
log.info(f"Starting to monitor pipelinerun: {pipelinerun_name}")
start_time = time.time()
while True:
try:
# Check if we've exceeded the timeout
elapsed_time = time.time() - start_time
if elapsed_time > timeout:
raise IIBError(
f"Timeout waiting for pipelinerun {pipelinerun_name} to complete "
f"after {timeout} seconds"
)
# Fetch the current status of the pipelinerun
v1_client = _get_kubernetes_client()
worker_config = get_worker_config()
namespace = getattr(worker_config, 'iib_konflux_namespace', 'iib-tenant')
run = v1_client.get_namespaced_custom_object(
group="tekton.dev",
version="v1",
namespace=namespace,
plural="pipelineruns",
name=pipelinerun_name,
)
# Extract status information
status = run.get("status", {})
conditions = status.get("conditions", [])
if not conditions:
log.info(f"Pipelinerun {pipelinerun_name} is still initializing...")
time.sleep(30)
continue
# Get the condition (there's typically only one condition object for PipelineRuns)
condition = conditions[0] if conditions else {}
reason = condition.get("reason", "Unknown")
condition_type = condition.get("type", "Unknown")
status = condition.get("status", "Unknown")
message = condition.get("message", "")
log.info(
f"Pipelinerun {pipelinerun_name} status: reason={reason}, "
f"type={condition_type}, status={status}"
)
if message:
log.info(f"Pipelinerun {pipelinerun_name} message: {message}")
# Check if the pipelinerun has completed based on Tekton status documentation
# https://tekton.dev/docs/pipelines/pipelineruns/#monitoring-execution-status
# Success cases
if reason in ("Succeeded", "Completed"):
log.info(f"Pipelinerun {pipelinerun_name} completed successfully")
return
# Failure cases
elif reason in ("Failed", "PipelineRunTimeout", "CreateRunFailed"):
error_msg = f"Pipelinerun {pipelinerun_name} failed"
if reason == "PipelineRunTimeout":
error_msg += " due to timeout"
elif reason == "CreateRunFailed":
error_msg += " due to resource creation failure"
elif message:
error_msg += f": {message}"
raise IIBError(error_msg)
# Cancellation cases
elif reason == "Cancelled":
raise IIBError(f"Pipelinerun {pipelinerun_name} was cancelled")
# Check for error status (False status indicates failure)
elif status == "False":
error_msg = f"Pipelinerun {pipelinerun_name} failed"
if message:
error_msg += f": {message}"
raise IIBError(error_msg)
# Still running, wait before next check
log.info(
f"Pipelinerun {pipelinerun_name} is still running... (reason: {reason})"
)
time.sleep(30)
except ApiException as e:
log.error(
f"Kubernetes API error while monitoring pipelinerun {pipelinerun_name}: "
f"{e.status} - {e.reason}"
)
raise IIBError(
f"Failed to monitor pipelinerun {pipelinerun_name}: API error {e.status}"
)
except IIBError:
# Re-raise IIBError as-is
raise
except Exception as e:
log.error(
f"Unexpected error while monitoring pipelinerun {pipelinerun_name}: "
f"{type(e).__name__}"
)
raise IIBError(
f"Unexpected error while monitoring pipelinerun {pipelinerun_name}: "
f"{type(e).__name__}"
)
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Explicitly raise from a previous error [×2] ([`raise-from-previous-error`](https://docs.sourcery.ai/Reference/Default-Rules/suggestions/raise-from-previous-error/))
- Low code quality found in wait\_for\_pipeline\_completion - 25% ([`low-code-quality`](https://docs.sourcery.ai/Reference/Default-Rules/comments/low-code-quality/))
<br/><details><summary>Explanation</summary>
The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.
How can you solve this?
It might be worth refactoring this function to make it shorter and more readable.
- Reduce the function length by extracting pieces of functionality out into
their own functions. This is the most important thing you can do - ideally a
function should be less than 10 lines.
- Reduce nesting, perhaps by introducing guard clauses to return early.
- Ensure that variables are tightly scoped, so that code using related concepts
sits together within the function rather than being scattered.</details>
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
8f0115a
to
06e82b8
Compare
86417cb
to
bbb8897
Compare
bbb8897
to
87bfe32
Compare
87bfe32
to
6b7862d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, just small comments on deps
4e75ee4
to
556495a
Compare
CLOUDDST-28645 Signed-off-by: Yashvardhan Nanavati <[email protected]> Assisted-by: Cursor Signed-off-by: Yashvardhan Nanavati <[email protected]>
This commit adds kubernetes package as a dependency in IIB. NOTE: It downgrades urllib3 to 2.3.0 from 2.3.5 because kubernetes requires urllib3<2.4.0. Signed-off-by: Yashvardhan Nanavati <[email protected]>
556495a
to
6ec0e77
Compare
CLOUDDST-28645
Assisted-by: Cursor
Summary by Sourcery
Add support for cross-cluster Konflux PipelineRun monitoring by extending worker configuration, validating the new options, implementing konflux_utils functions, and covering them with documentation and tests
New Features:
Enhancements:
Documentation:
Tests: