2222import static com .michelin .ns4kafka .util .FormatErrorUtils .invalidSchemaResource ;
2323import static com .michelin .ns4kafka .util .FormatErrorUtils .invalidSchemaSubjectName ;
2424
25+ import com .fasterxml .jackson .databind .JsonNode ;
26+ import com .fasterxml .jackson .databind .ObjectMapper ;
2527import com .michelin .ns4kafka .model .AccessControlEntry ;
2628import com .michelin .ns4kafka .model .Metadata ;
2729import com .michelin .ns4kafka .model .Namespace ;
3335import com .michelin .ns4kafka .service .client .schema .entities .SchemaRequest ;
3436import com .michelin .ns4kafka .service .client .schema .entities .SchemaResponse ;
3537import com .michelin .ns4kafka .util .RegexUtils ;
36- import com .michelin .ns4kafka .validation .SchemaSubjectNameValidator ;
3738import io .confluent .kafka .schemaregistry .avro .AvroSchema ;
3839import io .confluent .kafka .schemaregistry .client .rest .entities .SchemaReference ;
3940import io .micronaut .core .util .CollectionUtils ;
5354@ Slf4j
5455@ Singleton
5556public class SchemaService {
57+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ();
5658 @ Inject
5759 private AclService aclService ;
5860
@@ -72,7 +74,7 @@ public Flux<Schema> findAllForNamespace(Namespace namespace) {
7274 return schemaRegistryClient
7375 .getSubjects (namespace .getMetadata ().getCluster ())
7476 .filter (subject -> {
75- String underlyingTopicName = SchemaSubjectNameValidator . extractTopicName (subject , namingStrategies )
77+ String underlyingTopicName = extractTopicName (subject , namingStrategies )
7678 .orElse ("" );
7779 return aclService .isResourceCoveredByAcls (acls , underlyingTopicName );
7880 })
@@ -203,7 +205,7 @@ public Mono<List<String>> validateSchema(Namespace namespace, Schema schema) {
203205 List <String > validationErrors = new ArrayList <>();
204206 List <SubjectNameStrategy > namingStrategies = getValidSubjectNameStrategies (namespace );
205207 String subjectName = schema .getMetadata ().getName ();
206- boolean isValid = SchemaSubjectNameValidator . validateSubjectName (
208+ boolean isValid = validateSubjectName (
207209 subjectName ,
208210 namingStrategies ,
209211 schema .getSpec ().getSchema (),
@@ -362,7 +364,7 @@ public Mono<SchemaCompatibilityResponse> updateSubjectCompatibility(
362364 */
363365 public boolean isNamespaceOwnerOfSubject (Namespace namespace , String subjectName ) {
364366 List <SubjectNameStrategy > namingStrategies = getValidSubjectNameStrategies (namespace );
365- String underlyingTopicName = SchemaSubjectNameValidator . extractTopicName (subjectName , namingStrategies )
367+ String underlyingTopicName = extractTopicName (subjectName , namingStrategies )
366368 .orElse ("" );
367369 return aclService .isNamespaceOwnerOfResource (
368370 namespace .getMetadata ().getName (), AccessControlEntry .ResourceType .TOPIC , underlyingTopicName );
@@ -452,4 +454,132 @@ private List<SubjectNameStrategy> getValidSubjectNameStrategies(Namespace namesp
452454 }
453455 return List .of (SubjectNameStrategy .DEFAULT );
454456 }
457+
458+
459+ /**
460+ * Validates that a schema subject name follows the specified naming strategy.
461+ *
462+ * @param subjectName The schema subject name to validate
463+ * @param schemaContent The schema content (for extracting record names)
464+ * @param schemaType The schema type (AVRO, JSON, PROTOBUF)
465+ * @return true if the subject name is valid for any of the strategies, false otherwise
466+ */
467+ public static boolean validateSubjectName (
468+ String subjectName ,
469+ List <SubjectNameStrategy > validStrategies ,
470+ String schemaContent ,
471+ Schema .SchemaType schemaType ) {
472+ if (subjectName == null || subjectName .trim ().isEmpty ()) {
473+ return false ;
474+ }
475+ for (SubjectNameStrategy strategy : validStrategies ) {
476+ if (validateSubjectNameWithStrategy (subjectName , strategy , schemaContent , schemaType )) {
477+ return true ;
478+ }
479+ }
480+ return false ;
481+ }
482+
483+ public static boolean validateSubjectNameWithStrategy (
484+ String subjectName , SubjectNameStrategy strategy , String schemaContent , Schema .SchemaType schemaType ) {
485+ // https://github.com/confluentinc/schema-registry/blob/master/schema-serializer/src/main/java/io/confluent/kafka/serializers/subject
486+ switch (strategy ) {
487+ case TOPIC_NAME :
488+ String topicName = extractTopicName (subjectName , strategy ).orElse ("" );
489+ return subjectName .equals (topicName + "-key" ) || subjectName .equals (topicName + "-value" );
490+ case TOPIC_RECORD_NAME :
491+ String topicName2 = extractTopicName (subjectName , strategy ).orElse ("" );
492+ Optional <String > recordName = extractRecordName (schemaContent , schemaType );
493+ return recordName .isPresent () && subjectName .equals (topicName2 + "-" + recordName .get ());
494+ case RECORD_NAME :
495+ Optional <String > recordNameOnly = extractRecordName (schemaContent , schemaType );
496+ return recordNameOnly .isPresent () && subjectName .equals (recordNameOnly .get ());
497+ default :
498+ return false ;
499+ }
500+ }
501+
502+ /**
503+ * Extracts the record name from schema content based on schema type. /!\ Only AVRO schema are handled.
504+ * TopicRecordName Strategy will not be valid for any other schema.
505+ *
506+ * @param schemaContent The schema content as string
507+ * @param schemaType The type of schema (AVRO, JSON, PROTOBUF)
508+ * @return Optional containing the record name if found
509+ */
510+ public static Optional <String > extractRecordName (String schemaContent , Schema .SchemaType schemaType ) {
511+ if (schemaContent == null || schemaContent .trim ().isEmpty ()) {
512+ return Optional .empty ();
513+ }
514+
515+ try {
516+ switch (schemaType ) {
517+ case AVRO :
518+ return extractAvroRecordName (schemaContent );
519+ default :
520+ log .warn ("Unsupported schema type for record name extraction: {}" , schemaType );
521+ return Optional .empty ();
522+ }
523+ } catch (Exception e ) {
524+ log .error ("Failed to extract record name from schema content" , e );
525+ return Optional .empty ();
526+ }
527+ }
528+
529+ private static Optional <String > extractAvroRecordName (String schemaContent ) {
530+ try {
531+ JsonNode schemaNode = OBJECT_MAPPER .readTree (schemaContent );
532+ String recordClassName = "" ;
533+ if (schemaNode .has ("namespace" )) {
534+ recordClassName += schemaNode .get ("namespace" ).asText () + "." ;
535+ }
536+ if (schemaNode .has ("name" )) {
537+ recordClassName += schemaNode .get ("name" ).asText ();
538+ }
539+ return Optional .of (recordClassName );
540+ } catch (Exception e ) {
541+ log .debug ("Failed to parse AVRO schema as JSON" , e );
542+ }
543+ return Optional .empty ();
544+ }
545+
546+ /**
547+ * Extracts the topic name from a subject name based on the naming strategy.
548+ *
549+ * @param subjectName The subject name (assumed to be not empty)
550+ * @param strategy The naming strategy
551+ * @return The topic name if it can be determined
552+ */
553+ public static Optional <String > extractTopicName (String subjectName , SubjectNameStrategy strategy ) {
554+ switch (strategy ) {
555+ case TOPIC_NAME :
556+ return Optional .of (subjectName .replaceAll ("(-key|-value)$" , "" ));
557+ case TOPIC_RECORD_NAME :
558+ int indexOfLastDash = subjectName .lastIndexOf ("-" );
559+ return (indexOfLastDash == -1 )
560+ ? Optional .empty ()
561+ : Optional .of (subjectName .substring (0 , indexOfLastDash ));
562+ default :
563+ return Optional .empty ();
564+ }
565+ }
566+
567+ /**
568+ * Extracts the topic name from a subject name according to the allowed strategies.
569+ *
570+ * @param subjectName The subject name (assumed to be not empty)
571+ * @param strategies The list of strategies to try
572+ * @return The topic name if it can be determined by any of the strategies
573+ */
574+ public static Optional <String > extractTopicName (String subjectName , List <SubjectNameStrategy > strategies ) {
575+ for (SubjectNameStrategy strategy : strategies ) {
576+ Optional <String > topicName = extractTopicName (subjectName , strategy );
577+ if (topicName .isPresent ()) {
578+ return topicName ;
579+ }
580+ }
581+ return Optional .empty ();
582+ }
583+
584+
455585}
0 commit comments