Skip to content

Commit 6475317

Browse files
authored
Add schema migration example (confluentinc#2244)
1 parent 7c4a4ca commit 6475317

File tree

4 files changed

+230
-1
lines changed

4 files changed

+230
-1
lines changed

Diff for: Confluent.Kafka.sln

+15
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AvroGenericEncryption", "ex
9393
EndProject
9494
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ProtobufEncryption", "examples\ProtobufEncryption\ProtobufEncryption.csproj", "{6988FB1F-3648-4E5E-821F-55D67CA00FD7}"
9595
EndProject
96+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AvroGenericMigration", "examples\AvroGenericMigration\AvroGenericMigration.csproj", "{10CD6000-59A3-40C9-905F-20F4EE03C1B4}"
97+
EndProject
9698
Global
9799
GlobalSection(SolutionConfigurationPlatforms) = preSolution
98100
Debug|Any CPU = Debug|Any CPU
@@ -610,6 +612,18 @@ Global
610612
{6988FB1F-3648-4E5E-821F-55D67CA00FD7}.Release|x64.Build.0 = Release|Any CPU
611613
{6988FB1F-3648-4E5E-821F-55D67CA00FD7}.Release|x86.ActiveCfg = Release|Any CPU
612614
{6988FB1F-3648-4E5E-821F-55D67CA00FD7}.Release|x86.Build.0 = Release|Any CPU
615+
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
616+
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Debug|Any CPU.Build.0 = Debug|Any CPU
617+
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Debug|x64.ActiveCfg = Debug|Any CPU
618+
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Debug|x64.Build.0 = Debug|Any CPU
619+
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Debug|x86.ActiveCfg = Debug|Any CPU
620+
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Debug|x86.Build.0 = Debug|Any CPU
621+
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Release|Any CPU.ActiveCfg = Release|Any CPU
622+
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Release|Any CPU.Build.0 = Release|Any CPU
623+
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Release|x64.ActiveCfg = Release|Any CPU
624+
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Release|x64.Build.0 = Release|Any CPU
625+
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Release|x86.ActiveCfg = Release|Any CPU
626+
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Release|x86.Build.0 = Release|Any CPU
613627
EndGlobalSection
614628
GlobalSection(NestedProjects) = preSolution
615629
{09C3255B-1972-4EB8-91D0-FB9F5CD82BCB} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
@@ -651,5 +665,6 @@ Global
651665
{222965B5-B263-4F2C-B629-F3AA5B3A82AF} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
652666
{6727B941-3E07-4841-84E0-8EE47E04A3B3} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
653667
{6988FB1F-3648-4E5E-821F-55D67CA00FD7} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
668+
{10CD6000-59A3-40C9-905F-20F4EE03C1B4} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
654669
EndGlobalSection
655670
EndGlobal

Diff for: examples/AvroGenericEncryption/Program.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
using System.Threading.Tasks;
3232

3333

34-
namespace Confluent.Kafka.Examples.AvroGeneric
34+
namespace Confluent.Kafka.Examples.AvroGenericEncryption
3535
{
3636
class Program
3737
{
+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<ProjectTypeGuids>{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
5+
<AssemblyName>AvroGenericMigration</AssemblyName>
6+
<OutputType>Exe</OutputType>
7+
<TargetFramework>net6.0</TargetFramework>
8+
<LangVersion>7.1</LangVersion>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="2.4.0" /> -->
13+
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj" />
14+
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Rules/Confluent.SchemaRegistry.Rules.csproj" />
15+
</ItemGroup>
16+
17+
</Project>

Diff for: examples/AvroGenericMigration/Program.cs

+197
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
// Copyright 2024 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using Avro;
18+
using Avro.Generic;
19+
using Confluent.Kafka.SyncOverAsync;
20+
using Confluent.SchemaRegistry.Serdes;
21+
using Confluent.SchemaRegistry;
22+
using Schema = Confluent.SchemaRegistry.Schema;
23+
using System;
24+
using System.Collections.Generic;
25+
using System.Threading;
26+
using System.Threading.Tasks;
27+
using Confluent.SchemaRegistry.Rules;
28+
29+
30+
namespace Confluent.Kafka.Examples.AvroGenericMigration
31+
{
32+
class Program
33+
{
34+
static async Task Main(string[] args)
35+
{
36+
if (args.Length != 3)
37+
{
38+
Console.WriteLine("Usage: .. bootstrapServers schemaRegistryUrl topicName");
39+
return;
40+
}
41+
42+
// Register the KMS drivers and the field encryption executor
43+
JsonataExecutor.Register();
44+
45+
string bootstrapServers = args[0];
46+
string schemaRegistryUrl = args[1];
47+
string topicName = args[2];
48+
string subjectName = topicName + "-value";
49+
string groupName = "avro-generic-example-group";
50+
51+
var avroSerializerConfig = new AvroSerializerConfig
52+
{
53+
AutoRegisterSchemas = false,
54+
UseLatestWithMetadata = new Dictionary<string, string>()
55+
{
56+
["application.major.version"] = "1"
57+
},
58+
// optional Avro serializer properties:
59+
BufferBytes = 100
60+
};
61+
62+
var avroDeserializerConfig = new AvroDeserializerConfig
63+
{
64+
UseLatestWithMetadata = new Dictionary<string, string>()
65+
{
66+
["application.major.version"] = "2"
67+
}
68+
};
69+
70+
var s = (RecordSchema)RecordSchema.Parse(
71+
@"{
72+
""type"": ""record"",
73+
""name"": ""User"",
74+
""fields"": [
75+
{""name"": ""name"", ""type"": ""string"", ""confluent:tags"": [""PII""]},
76+
{""name"": ""favorite_number"", ""type"": ""long""},
77+
{""name"": ""favorite_color"", ""type"": ""string""}
78+
]
79+
}"
80+
);
81+
82+
Confluent.SchemaRegistry.Metadata metadata = new Confluent.SchemaRegistry.Metadata(
83+
null,
84+
new Dictionary<string, string>
85+
{
86+
["application.major.version"] = "1",
87+
},
88+
null
89+
);
90+
Schema schema = new Schema(s.ToString(), null, SchemaType.Avro, metadata, null);
91+
92+
var s2 = (RecordSchema)RecordSchema.Parse(
93+
@"{
94+
""type"": ""record"",
95+
""name"": ""User"",
96+
""fields"": [
97+
{""name"": ""name"", ""type"": ""string"", ""confluent:tags"": [""PII""]},
98+
{""name"": ""fave_num"", ""type"": ""long""},
99+
{""name"": ""favorite_color"", ""type"": ""string""}
100+
]
101+
}"
102+
);
103+
104+
Confluent.SchemaRegistry.Metadata metadata2 = new Confluent.SchemaRegistry.Metadata(
105+
null,
106+
new Dictionary<string, string>
107+
{
108+
["application.major.version"] = "2",
109+
},
110+
null
111+
);
112+
String expr = "$merge([$sift($, function($v, $k) {$k != 'favorite_number'}), {'fave_num': $.'favorite_number'}])";
113+
RuleSet ruleSet = new RuleSet(new List<Rule>
114+
{
115+
new Rule("upgrade", RuleKind.Transform, RuleMode.Upgrade, "JSONATA", null, null,
116+
expr, null, null, false)
117+
}, new List<Rule>()
118+
);
119+
Schema schema2 = new Schema(s2.ToString(), null, SchemaType.Avro, metadata2, ruleSet);
120+
121+
CancellationTokenSource cts = new CancellationTokenSource();
122+
var consumeTask = Task.Run(() =>
123+
{
124+
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
125+
using (var consumer =
126+
new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
127+
.SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry, avroDeserializerConfig).AsSyncOverAsync())
128+
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
129+
.Build())
130+
{
131+
consumer.Subscribe(topicName);
132+
133+
try
134+
{
135+
while (true)
136+
{
137+
try
138+
{
139+
var consumeResult = consumer.Consume(cts.Token);
140+
141+
Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Message.Value}");
142+
}
143+
catch (ConsumeException e)
144+
{
145+
Console.WriteLine($"Consume error: {e.Error.Reason}");
146+
}
147+
}
148+
}
149+
catch (OperationCanceledException)
150+
{
151+
// commit final offsets and leave the group.
152+
consumer.Close();
153+
}
154+
}
155+
});
156+
157+
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
158+
using (var producer =
159+
new ProducerBuilder<string, GenericRecord>(new ProducerConfig { BootstrapServers = bootstrapServers })
160+
.SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry, avroSerializerConfig))
161+
.Build())
162+
{
163+
var c = schemaRegistry.UpdateCompatibilityAsync(Compatibility.None, null).Result;
164+
var id = schemaRegistry.RegisterSchemaAsync(subjectName, schema, true).Result;
165+
var id2 = schemaRegistry.RegisterSchemaAsync(subjectName, schema2, true).Result;
166+
167+
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");
168+
169+
long i = 1;
170+
string text;
171+
while ((text = Console.ReadLine()) != "q")
172+
{
173+
var record = new GenericRecord(s);
174+
record.Add("name", text);
175+
record.Add("favorite_number", i++);
176+
record.Add("favorite_color", "blue");
177+
178+
try
179+
{
180+
var dr = await producer.ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record });
181+
Console.WriteLine($"produced to: {dr.TopicPartitionOffset}");
182+
}
183+
catch (ProduceException<string, GenericRecord> ex)
184+
{
185+
// In some cases (notably Schema Registry connectivity issues), the InnerException
186+
// of the ProduceException contains additional informatiom pertaining to the root
187+
// cause of the problem. This information is automatically included in the output
188+
// of the ToString() method of the ProduceException, called implicitly in the below.
189+
Console.WriteLine($"error producing message: {ex}");
190+
}
191+
}
192+
}
193+
194+
cts.Cancel();
195+
}
196+
}
197+
}

0 commit comments

Comments
 (0)