Skip to content

Commit 2478789

Browse files
Merge branch 'main' into feat/export-occurrences-data
2 parents 747708c + d0e0f38 commit 2478789

10 files changed

Lines changed: 143 additions & 45 deletions

File tree

ami/jobs/apps.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
from django.utils.translation import gettext_lazy as _
33

44

5-
class UsersConfig(AppConfig):
5+
class JobsConfig(AppConfig):
66
name = "ami.jobs"
77
verbose_name = _("Jobs")

ami/jobs/models.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -401,10 +401,9 @@ def run(cls, job: "Job"):
401401
total_detections = 0
402402
total_classifications = 0
403403

404-
# Set to low size because our response JSON just got enormous
405-
# @TODO make this configurable
406-
CHUNK_SIZE = 1
407-
chunks = [images[i : i + CHUNK_SIZE] for i in range(0, image_count, CHUNK_SIZE)] # noqa
404+
config = job.pipeline.get_config(project_id=job.project.pk)
405+
chunk_size = config.get("request_source_image_batch_size", 1)
406+
chunks = [images[i : i + chunk_size] for i in range(0, image_count, chunk_size)] # noqa
408407
request_failed_images = []
409408

410409
for i, chunk in enumerate(chunks):
@@ -436,9 +435,9 @@ def run(cls, job: "Job"):
436435
"process",
437436
status=JobState.STARTED,
438437
progress=(i + 1) / len(chunks),
439-
processed=min((i + 1) * CHUNK_SIZE, image_count),
438+
processed=min((i + 1) * chunk_size, image_count),
440439
failed=len(request_failed_images),
441-
remaining=max(image_count - ((i + 1) * CHUNK_SIZE), 0),
440+
remaining=max(image_count - ((i + 1) * chunk_size), 0),
442441
)
443442

444443
# count the completed, successful, and failed save_tasks:

ami/labelstudio/apps.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
from django.utils.translation import gettext_lazy as _
33

44

5-
class UsersConfig(AppConfig):
5+
class LabelStudioConfig(AppConfig):
66
name = "ami.labelstudio"
77
verbose_name = _("Label Studio Integration")

ami/ml/apps.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
from django.utils.translation import gettext_lazy as _
33

44

5-
class UsersConfig(AppConfig):
5+
class MLConfig(AppConfig):
66
name = "ami.ml"
77
verbose_name = _("Machine Learning")
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Generated by Django 4.2.10 on 2025-03-19 16:27
2+
3+
import ami.ml.schemas
4+
from django.db import migrations
5+
import django_pydantic_field.fields
6+
7+
8+
class Migration(migrations.Migration):
9+
dependencies = [
10+
("ml", "0020_projectpipelineconfig_alter_pipeline_projects"),
11+
]
12+
13+
operations = [
14+
migrations.AddField(
15+
model_name="pipeline",
16+
name="default_config",
17+
field=django_pydantic_field.fields.PydanticSchemaField(
18+
config=None,
19+
default=dict,
20+
help_text="The default configuration for the pipeline. Used by both the job sending images to the pipeline and the processing service.",
21+
schema=ami.ml.schemas.PipelineRequestConfigParameters,
22+
),
23+
),
24+
]

ami/ml/models/pipeline.py

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from typing import TYPE_CHECKING
44

55
if TYPE_CHECKING:
6-
from ami.ml.models import ProcessingService # , ProjectPipelineConfig
6+
from ami.ml.models import ProcessingService, ProjectPipelineConfig
7+
from ami.jobs.models import Job
78

89
import collections
910
import dataclasses
@@ -40,6 +41,7 @@
4041
ClassificationResponse,
4142
DetectionResponse,
4243
PipelineRequest,
44+
PipelineRequestConfigParameters,
4345
PipelineResultsResponse,
4446
SourceImageRequest,
4547
SourceImageResponse,
@@ -98,7 +100,7 @@ def filter_processed_images(
98100
)
99101
# log all algorithms that are in the pipeline but not in the detection
100102
missing_algos = pipeline_algorithm_ids - detection_algorithm_ids
101-
task_logger.info(f"Image #{image.pk} needs classification by pipeline's algorithms: {missing_algos}")
103+
task_logger.debug(f"Image #{image.pk} needs classification by pipeline's algorithms: {missing_algos}")
102104
yield image
103105
else:
104106
# If all detections have been classified by the pipeline, skip the image
@@ -162,9 +164,6 @@ def process_images(
162164
) -> PipelineResultsResponse:
163165
"""
164166
Process images using ML pipeline API.
165-
166-
@TODO find a home for this function.
167-
@TODO break into task chunks.
168167
"""
169168
job = None
170169
task_logger = logger
@@ -201,29 +200,11 @@ def process_images(
201200
if url
202201
]
203202

204-
if project_id:
205-
try:
206-
config = pipeline.project_pipeline_configs.get(project_id=project_id).config
207-
task_logger.info(
208-
f"Sending pipeline request using {config} from the project-pipeline config "
209-
f"for Pipeline {pipeline} and Project id {project_id}."
210-
)
211-
except pipeline.project_pipeline_configs.model.DoesNotExist as e:
212-
task_logger.error(
213-
f"Error getting the project-pipeline config for Pipeline {pipeline} "
214-
f"and Project id {project_id}: {e}"
215-
)
216-
config = {}
217-
task_logger.info(
218-
"Using empty config when sending pipeline request since no project-pipeline config "
219-
f"was found for Pipeline {pipeline} and Project id {project_id}"
220-
)
221-
else:
222-
config = {}
223-
task_logger.info(
224-
"Using empty config when sending pipeline request "
225-
f"since no project id was provided for Pipeline {pipeline}"
226-
)
203+
if not project_id:
204+
task_logger.warning(f"Pipeline {pipeline} is not associated with a project")
205+
206+
config = pipeline.get_config(project_id=project_id)
207+
task_logger.info(f"Using pipeline config: {config}")
227208

228209
request_data = PipelineRequest(
229210
pipeline=pipeline.slug,
@@ -914,7 +895,7 @@ class Pipeline(BaseModel):
914895
description = models.TextField(blank=True)
915896
version = models.IntegerField(default=1)
916897
version_name = models.CharField(max_length=255, blank=True)
917-
# @TODO the algorithms list be retrieved by querying the pipeline endpoint
898+
# @TODO the algorithms attribute is not currently used. Review for removal.
918899
algorithms = models.ManyToManyField("ml.Algorithm", related_name="pipelines")
919900
stages: list[PipelineStage] = SchemaField(
920901
default=default_stages,
@@ -926,8 +907,18 @@ class Pipeline(BaseModel):
926907
projects = models.ManyToManyField(
927908
"main.Project", related_name="pipelines", blank=True, through="ml.ProjectPipelineConfig"
928909
)
910+
default_config: PipelineRequestConfigParameters = SchemaField(
911+
schema=PipelineRequestConfigParameters,
912+
default=dict,
913+
help_text=(
914+
"The default configuration for the pipeline. "
915+
"Used by both the job sending images to the pipeline "
916+
"and the processing service."
917+
),
918+
)
929919
processing_services: models.QuerySet[ProcessingService]
930-
# project_pipeline_configs: models.QuerySet[ProjectPipelineConfig]
920+
project_pipeline_configs: models.QuerySet[ProjectPipelineConfig]
921+
jobs: models.QuerySet[Job]
931922

932923
class Meta:
933924
ordering = ["name", "version"]
@@ -939,6 +930,26 @@ class Meta:
939930
def __str__(self):
940931
return f'#{self.pk} "{self.name}" ({self.slug}) v{self.version}'
941932

933+
def get_config(self, project_id: int | None = None) -> PipelineRequestConfigParameters:
934+
"""
935+
Get the configuration for the pipeline request.
936+
937+
This will be the same as pipeline.default_config, but if a project ID is provided,
938+
the project's pipeline config will be used to override the default config.
939+
"""
940+
config = self.default_config
941+
if project_id:
942+
try:
943+
project_pipeline_config = self.project_pipeline_configs.get(project_id=project_id)
944+
if project_pipeline_config.config:
945+
config.update(project_pipeline_config.config)
946+
logger.debug(
947+
f"Using ProjectPipelineConfig for Pipeline {self} and Project #{project_id}:" f"config: {config}"
948+
)
949+
except self.project_pipeline_configs.model.DoesNotExist as e:
950+
logger.warning(f"No project-pipeline config for Pipeline {self} " f"and Project #{project_id}: {e}")
951+
return config
952+
942953
def collect_images(
943954
self,
944955
collection: SourceImageCollection | None = None,

ami/ml/models/processing_service.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def create_pipelines(self):
101101
algorithms_created=algorithms_created,
102102
)
103103

104-
def get_status(self):
104+
def get_status(self, timeout=6):
105105
"""
106106
Check the status of the processing service.
107107
This is a simple health check that pings the /readyz endpoint of the service.
@@ -116,7 +116,7 @@ def get_status(self):
116116
resp = None
117117

118118
try:
119-
resp = requests.get(ready_check_url)
119+
resp = requests.get(ready_check_url, timeout=timeout)
120120
resp.raise_for_status()
121121
self.last_checked_live = True
122122
latency = time.time() - start_time
@@ -158,13 +158,13 @@ def get_status(self):
158158

159159
return response
160160

161-
def get_pipeline_configs(self):
161+
def get_pipeline_configs(self, timeout=6):
162162
"""
163163
Get the pipeline configurations from the processing service.
164164
This can be a long response as it includes the full category map for each algorithm.
165165
"""
166166
info_url = urljoin(self.endpoint_url, "info")
167-
resp = requests.get(info_url)
167+
resp = requests.get(info_url, timeout=timeout)
168168
resp.raise_for_status()
169169
info_data = ProcessingServiceInfoResponse.parse_obj(resp.json())
170170
return info_data.pipelines

ami/ml/schemas.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,29 @@ class Config:
144144
]
145145

146146

147+
class PipelineRequestConfigParameters(dict):
148+
"""Parameters used to configure a pipeline request.
149+
150+
Accepts any serializable key-value pair.
151+
Example: {"force_reprocess": True, "auth_token": "abc123"}
152+
153+
Supported parameters are defined by the pipeline in the processing service
154+
and should be published in the Pipeline's info response.
155+
156+
Parameters that are used by Antenna before sending the request to the Processing Service
157+
should be prefixed with "request_".
158+
Example: {"request_source_image_batch_size": 8}
159+
Such parameters need to be ignored by the schema in the Processing Service, or
160+
removed before sending the request to the Processing Service.
161+
"""
162+
163+
pass
164+
165+
147166
class PipelineRequest(pydantic.BaseModel):
148167
pipeline: str
149168
source_images: list[SourceImageRequest]
150-
config: dict
169+
config: PipelineRequestConfigParameters | dict | None = None
151170

152171

153172
class PipelineResultsResponse(pydantic.BaseModel):

ami/ml/tests.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,30 @@ def test_yes_reprocess_if_new_terminal_algorithm_same_intermediate(self):
574574
remaining_images_to_process = len(images_again)
575575
self.assertEqual(remaining_images_to_process, len(images), "Images not re-processed with new pipeline")
576576

577+
def test_project_pipeline_config(self):
578+
"""
579+
Test the default_config for a pipeline, as well as the project pipeline config.
580+
Ensure the project pipeline parameters override the pipeline defaults.
581+
"""
582+
from ami.ml.models import ProjectPipelineConfig
583+
from ami.ml.schemas import PipelineRequestConfigParameters
584+
585+
# Add config to the pipeline & project
586+
self.pipeline.default_config = PipelineRequestConfigParameters({"test_param": "test_value"})
587+
self.pipeline.save()
588+
self.project_pipeline_config = ProjectPipelineConfig.objects.create(
589+
project=self.project,
590+
pipeline=self.pipeline,
591+
config={"test_param": "project_value"},
592+
)
593+
self.project_pipeline_config.save()
594+
595+
# Check the final config
596+
default_config = self.pipeline.get_config()
597+
self.assertEqual(default_config["test_param"], "test_value")
598+
final_config = self.pipeline.get_config(self.project.pk)
599+
self.assertEqual(final_config["test_param"], "project_value")
600+
577601

578602
class TestAlgorithmCategoryMaps(TestCase):
579603
def setUp(self):

processing_services/example/api/schemas.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,10 +187,30 @@ class Config:
187187
PipelineChoice = typing.Literal["random", "constant"]
188188

189189

190+
class PipelineRequestConfigParameters(pydantic.BaseModel):
191+
"""Parameters used to configure a pipeline request.
192+
193+
Accepts any serializable key-value pair.
194+
Example: {"force_reprocess": True, "auth_token": "abc123"}
195+
196+
Supported parameters are defined by the pipeline in the processing service
197+
and should be published in the Pipeline's info response.
198+
"""
199+
200+
force_reprocess: bool = pydantic.Field(
201+
default=False,
202+
description="Force reprocessing of the image, even if it has already been processed.",
203+
)
204+
auth_token: str | None = pydantic.Field(
205+
default=None,
206+
description="An optional authentication token to use for the pipeline.",
207+
)
208+
209+
190210
class PipelineRequest(pydantic.BaseModel):
191211
pipeline: PipelineChoice
192212
source_images: list[SourceImageRequest]
193-
config: dict
213+
config: PipelineRequestConfigParameters | dict | None = None
194214

195215
# Example for API docs:
196216
class Config:
@@ -203,6 +223,7 @@ class Config:
203223
"url": "https://archive.org/download/mma_various_moths_and_butterflies_54143/54143.jpg",
204224
}
205225
],
226+
"config": {"force_reprocess": True, "auth_token": "abc123"},
206227
}
207228
}
208229

0 commit comments

Comments
 (0)