Skip to content

Commit 610544b

Browse files
committed
Add support for defining schemas to be used by the sinks
Right now the Kafka sink does not support the use of the defined schemas, but these allow for defining valid/acceptable schemas up front for data written to specific topics. What this will _not_ do however is any form of type coercion! Make sure the schemas are the right types for the data coming in!
1 parent 7b2a969 commit 610544b

File tree

9 files changed

+295
-43
lines changed

9 files changed

+295
-43
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hotdog"
3-
version = "1.0.2"
3+
version = "1.1.0"
44
authors = ["R. Tyler Croy <rtyler@buoyantdata.com>"]
55
edition = "2024"
66

@@ -47,6 +47,7 @@ rdkafka = { version = "0", features = ["ssl", "sasl"]}
4747
object_store = { version = "0.12", features = ["aws", "cloud"] }
4848
arrow-array = "55"
4949
arrow-json = "55"
50+
arrow-schema = "55"
5051
parquet = { version = "55", features = ["arrow", "async", "object_store"]}
5152

5253
# Used for rule matching

src/connection.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ impl Connection {
8484

8585
while let Some(line) = lines.next().await {
8686
let line = line?;
87-
debug!("log: {}", line);
87+
trace!("log: {}", line);
8888

8989
let parsed = parse::parse_line(line);
9090

@@ -102,7 +102,7 @@ impl Connection {
102102
let mut msg = parsed.unwrap();
103103
self.stats.counter(Stats::LineReceived.into()).count(1);
104104
let mut continue_rules = true;
105-
debug!("parsed as: {}", msg.msg);
105+
trace!("parsed as: {}", msg.msg);
106106

107107
for rule in self.settings.rules.iter() {
108108
/*

src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ mod json;
1818
mod merge;
1919
mod parse;
2020
mod rules;
21+
mod schema;
2122
mod serve;
2223
mod settings;
2324
mod sink;

src/schema.rs

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
//!
2+
//! Simple schema mappings to help turn defined schemas into Arrow schemas
3+
4+
use arrow_schema::DataType;
5+
use serde::Deserialize;
6+
use tracing::log::*;
7+
8+
use std::collections::HashMap;
9+
10+
/// Supported schema definition types for the configuration of hotdog
11+
#[derive(Clone, Debug, Deserialize, PartialEq)]
12+
#[serde(rename_all = "lowercase")]
13+
pub enum FieldType {
14+
String,
15+
Struct,
16+
Long,
17+
Integer,
18+
Timestamp,
19+
Float,
20+
Boolean,
21+
}
22+
23+
/// Helpful converter for [arrow_schema::Schema] conversion
24+
impl Into<DataType> for &FieldType {
25+
fn into(self) -> DataType {
26+
match self {
27+
FieldType::String => DataType::Utf8,
28+
FieldType::Boolean => DataType::Boolean,
29+
FieldType::Integer => DataType::Int32,
30+
FieldType::Long => DataType::Int64,
31+
FieldType::Float => DataType::Float64,
32+
FieldType::Timestamp => DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
33+
unknown => {
34+
warn!(
35+
"Asked to convert a schema type which cannot be automatically converted: {unknown:?}"
36+
);
37+
DataType::Null
38+
}
39+
}
40+
}
41+
}
42+
43+
/// A field in the schema, which supports recursion. The name of the field is expected to be known
44+
/// by the container of the field
45+
#[derive(Clone, Debug, Deserialize)]
46+
pub struct Field {
47+
r#type: FieldType,
48+
fields: Option<HashMap<String, Field>>,
49+
}
50+
51+
impl Field {
52+
/// Retrieve the given [FieldType]
53+
pub fn get_type(&self) -> &FieldType {
54+
&self.r#type
55+
}
56+
57+
/// Retrieve the nested fields should they exist
58+
pub fn fields(&self) -> Option<&HashMap<String, Field>> {
59+
self.fields.as_ref()
60+
}
61+
}
62+
63+
/// Convert the given set of fields to an [arrow_schema::Schema] for use elsewher einside of hotdog
64+
pub fn into_arrow_schema(
65+
fields: &HashMap<String, Field>,
66+
) -> Result<arrow_schema::Schema, arrow_schema::ArrowError> {
67+
let mut arrow_fields = vec![];
68+
69+
for (name, field) in fields {
70+
if field.get_type() == &FieldType::Struct {
71+
if let Some(fields) = field.fields.as_ref() {
72+
let struct_fields = into_arrow_schema(fields)?;
73+
arrow_fields.push(arrow_schema::Field::new(
74+
name,
75+
DataType::Struct(struct_fields.fields),
76+
true,
77+
));
78+
} else {
79+
error!(
80+
"I don't know how to handle structs without fields! Calling it null :shrug:"
81+
);
82+
arrow_fields.push(arrow_schema::Field::new(name, DataType::Null, true));
83+
}
84+
} else {
85+
arrow_fields.push(arrow_schema::Field::new(
86+
name,
87+
field.get_type().into(),
88+
true,
89+
));
90+
}
91+
}
92+
93+
Ok(arrow_schema::Schema::new(arrow_fields))
94+
}
95+
96+
#[cfg(test)]
97+
mod tests {
98+
use super::*;
99+
100+
#[test]
101+
fn test_deser_simple_field() {
102+
let buf = r#"
103+
---
104+
version:
105+
type: string
106+
"#;
107+
let fields: HashMap<String, Field> =
108+
serde_yaml::from_str(buf).expect("Failed to deserialize a simple string field");
109+
110+
if let Some(field) = fields.get("version") {
111+
assert_eq!(field.get_type(), &FieldType::String);
112+
} else {
113+
panic!("Failed to get a version field");
114+
}
115+
}
116+
117+
#[test]
118+
fn test_deser_nested_map() {
119+
let buf = r#"
120+
---
121+
meta:
122+
type: struct
123+
fields:
124+
version:
125+
type: string
126+
"#;
127+
let fields: HashMap<String, Field> =
128+
serde_yaml::from_str(buf).expect("Failed to deserialize a nested field");
129+
130+
if let Some(field) = fields.get("meta") {
131+
assert_eq!(field.get_type(), &FieldType::Struct);
132+
133+
let nested = field
134+
.fields
135+
.as_ref()
136+
.expect("Failed to get the nested fields");
137+
assert_eq!(
138+
nested.get("version").unwrap().get_type(),
139+
&FieldType::String
140+
);
141+
} else {
142+
panic!("Failed to get a timestamp field");
143+
}
144+
}
145+
146+
/// The whole point of this schema definition is to get to an arrow schema definition!
147+
#[test]
148+
fn test_convert_simple_to_arrow() {
149+
let buf = r#"
150+
---
151+
version:
152+
type: string
153+
"#;
154+
let fields: HashMap<String, Field> =
155+
serde_yaml::from_str(buf).expect("Failed to deserialize a nested field");
156+
157+
let arr: arrow_schema::Schema =
158+
into_arrow_schema(&fields).expect("This is a valid arrow schema");
159+
assert_ne!(
160+
arr.fields.len(),
161+
0,
162+
"The converted schema should have one field"
163+
);
164+
}
165+
166+
#[test]
167+
fn test_convert_nested_to_arrow() {
168+
let buf = r#"
169+
---
170+
meta:
171+
type: struct
172+
fields:
173+
version:
174+
type: string
175+
"#;
176+
let fields: HashMap<String, Field> =
177+
serde_yaml::from_str(buf).expect("Failed to deserialize a nested field");
178+
179+
let arr: arrow_schema::Schema =
180+
into_arrow_schema(&fields).expect("This is a valid arrow schema");
181+
182+
let (_size, field) = arr
183+
.fields
184+
.find("meta")
185+
.expect("Failed to find the meta field");
186+
187+
let struct_fields = arrow_schema::Fields::from(vec![arrow_schema::Field::new(
188+
"version",
189+
DataType::Utf8,
190+
true,
191+
)]);
192+
let struct_type = DataType::Struct(struct_fields);
193+
194+
assert_eq!(field.data_type(), &struct_type);
195+
}
196+
}

src/serve/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,11 @@ pub trait Server {
9292
// If the Kafka sink is defined in the configuration, then spin up the configuration
9393
if let Some(kafka_conf) = &state.settings.global.kafka {
9494
info!("Configuring a Kafka sink with: {kafka_conf:?}");
95-
let mut kafka = Kafka::new(kafka_conf.clone(), state.stats.clone());
95+
let mut kafka = Kafka::new(
96+
kafka_conf.clone(),
97+
&state.settings.schemas,
98+
state.stats.clone(),
99+
);
96100

97101
kafka.bootstrap().await;
98102
sender = Some(kafka.get_sender());
@@ -106,7 +110,11 @@ pub trait Server {
106110

107111
if let Some(parquet_conf) = &state.settings.global.parquet {
108112
info!("Configuring a Parquet sink with: {parquet_conf:?}");
109-
let pq = Parquet::new(parquet_conf.clone(), state.stats.clone());
113+
let pq = Parquet::new(
114+
parquet_conf.clone(),
115+
&state.settings.schemas,
116+
state.stats.clone(),
117+
);
110118
sender = Some(pq.get_sender());
111119
smol::spawn(async move {
112120
debug!("Starting Parquet loop");

src/settings.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use serde_json::Value;
44
use tracing::log::*;
55
use uuid::Uuid;
66

7+
use std::collections::HashMap;
78
use std::path::Path;
89

910
pub fn load(file: &str) -> Settings {
@@ -159,23 +160,37 @@ pub struct Global {
159160
pub status: Option<Status>,
160161
}
161162

163+
/// Schema is a definition of an Arrow schema which can be used for deserialization
164+
#[derive(Debug, Deserialize)]
165+
pub struct Schema {
166+
/// Output topic this schema applies to
167+
pub topic: String,
168+
/// Fields of the actual schema
169+
pub fields: HashMap<String, crate::schema::Field>,
170+
}
171+
162172
#[derive(Debug, Deserialize)]
163173
pub struct Settings {
164174
pub global: Global,
165175
pub rules: Vec<Rule>,
176+
#[serde(default = "default_schemas")]
177+
/// Optionally defined schemas for this configuration
178+
pub schemas: Vec<Schema>,
166179
}
167180

168181
impl Settings {
169-
/**
170-
* Populate any configuration caches which we want to us
171-
*/
182+
/// Populate any configuration caches which we want to us
172183
fn populate_caches(&mut self) {
173184
self.rules.iter_mut().for_each(|rule| {
174185
rule.populate_caches();
175186
});
176187
}
177188
}
178189

190+
fn default_schemas() -> Vec<Schema> {
191+
vec![]
192+
}
193+
179194
fn default_none<T>() -> Option<T> {
180195
None
181196
}

src/sink/kafka.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub struct Kafka {
4141
impl Sink for Kafka {
4242
type Config = Config;
4343

44-
fn new(config: Config, stats: InputQueueScope) -> Self {
44+
fn new(config: Config, _schemas: &[crate::settings::Schema], stats: InputQueueScope) -> Self {
4545
let (tx, rx) = bounded(config.buffer);
4646
Kafka {
4747
producer: None,
@@ -242,7 +242,7 @@ mod tests {
242242

243243
let bucket = dipstick::AtomicBucket::new();
244244
let stats = InputQueueScope::wrap(bucket.clone(), 100);
245-
let mut k = Kafka::new(config, stats);
245+
let mut k = Kafka::new(config, &[], stats);
246246
smol::block_on(k.bootstrap());
247247
}
248248

src/sink/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ pub trait Sink: Send + Sync {
2424
/// Construct the Sink.
2525
///
2626
/// This function should not do anything but initialize settings and variables
27-
fn new(config: Self::Config, stats: InputQueueScope) -> Self;
27+
fn new(
28+
config: Self::Config,
29+
schemas: &[crate::settings::Schema],
30+
stats: InputQueueScope,
31+
) -> Self;
2832

2933
/// Bootstrap the sink
3034
///
@@ -77,7 +81,11 @@ mod tests {
7781
impl Sink for TestSink {
7882
type Config = Option<()>;
7983

80-
fn new(config: Option<()>, _stats: InputQueueScope) -> Self {
84+
fn new(
85+
config: Option<()>,
86+
_schemas: &[crate::settings::Schema],
87+
_stats: InputQueueScope,
88+
) -> Self {
8189
Self { config }
8290
}
8391

0 commit comments

Comments
 (0)