Skip to content

Commit 23a1ff9

Browse files
authored
feat: add has_data in system.streams (#18920)
show streams add has_data
1 parent a32d6d4 commit 23a1ff9

File tree

5 files changed

+25
-11
lines changed

5 files changed

+25
-11
lines changed

src/query/service/tests/it/storages/testdata/columns_table.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
200200
| 'file_name' | 'system' | 'temp_files' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
201201
| 'file_type' | 'system' | 'temp_files' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
202202
| 'group' | 'system' | 'configs' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
203+
| 'has_data' | 'system' | 'streams' | 'Boolean' | 'BOOLEAN' | '' | '' | 'NO' | '' |
203204
| 'histogram' | 'system' | 'statistics' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
204205
| 'hit' | 'system' | 'caches' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' |
205206
| 'host' | 'system' | 'clusters' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |

src/query/sql/src/planner/binder/ddl/stream.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ impl Binder {
142142
.with_column("owner")
143143
.with_column("comment")
144144
.with_column("mode")
145-
.with_column("invalid_reason");
145+
.with_column("invalid_reason")
146+
.with_column("has_data");
146147
} else {
147148
select_builder
148149
.with_column(format!("name AS `Streams_in_{database}`"))
@@ -210,7 +211,8 @@ impl Binder {
210211
.with_column("owner")
211212
.with_column("comment")
212213
.with_column("mode")
213-
.with_column("invalid_reason");
214+
.with_column("invalid_reason")
215+
.with_column("has_data");
214216
select_builder.with_filter(format!("catalog = '{catalog}'"));
215217
select_builder.with_filter(format!("database = '{database}'"));
216218
select_builder.with_filter(format!("name = '{stream}'"));

src/query/storages/system/src/streams_table.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use databend_common_catalog::table::Table;
2525
use databend_common_catalog::table_context::TableContext;
2626
use databend_common_exception::Result;
2727
use databend_common_expression::types::number::UInt64Type;
28+
use databend_common_expression::types::BooleanType;
2829
use databend_common_expression::types::NumberDataType;
2930
use databend_common_expression::types::StringType;
3031
use databend_common_expression::types::TimestampType;
@@ -104,6 +105,7 @@ impl<const T: bool> AsyncSystemTable for StreamsTable<T> {
104105
let mut updated_on = vec![];
105106
let mut table_version = vec![];
106107
let mut snapshot_location = vec![];
108+
let mut has_data = vec![];
107109

108110
let max_threads = ctx.get_settings().get_max_threads()? as usize;
109111
let io_request_semaphore = Arc::new(Semaphore::new(max_threads));
@@ -235,7 +237,8 @@ impl<const T: bool> AsyncSystemTable for StreamsTable<T> {
235237
}
236238
comment.push(stream_info.meta.comment.clone());
237239

238-
table_version.push(stream_table.offset().ok());
240+
let offset = stream_table.offset().ok();
241+
table_version.push(offset);
239242
table_id.push(source_tb_id);
240243
snapshot_location.push(stream_table.snapshot_loc());
241244

@@ -244,6 +247,7 @@ impl<const T: bool> AsyncSystemTable for StreamsTable<T> {
244247
let table = table.clone();
245248
let handler = runtime.spawn(async move {
246249
let mut reason = "".to_string();
250+
let mut has_data = false;
247251
// safe unwrap.
248252
let stream_table = StreamTable::try_from_table(table.as_ref()).unwrap();
249253
match stream_table.source_table(ctx).await {
@@ -258,23 +262,28 @@ impl<const T: bool> AsyncSystemTable for StreamsTable<T> {
258262
.err()
259263
.map_or("".to_string(), |e| e.display_text());
260264
}
265+
if let Some(ver) = offset {
266+
has_data = fuse_table.get_table_info().ident.seq != ver;
267+
}
261268
}
262269
Err(e) => {
263270
reason = e.display_text();
264271
}
265272
}
266273
drop(permit);
267-
reason
274+
(reason, has_data)
268275
});
269276
handlers.push(handler);
270277
}
271278
}
272279
}
273280

274-
let mut joint = futures::future::try_join_all(handlers)
281+
let joint = futures::future::try_join_all(handlers)
275282
.await
276283
.unwrap_or_default();
277-
invalid_reason.append(&mut joint);
284+
let (mut reasons, mut flags): (Vec<String>, Vec<bool>) = joint.into_iter().unzip();
285+
invalid_reason.append(&mut reasons);
286+
has_data.append(&mut flags);
278287
}
279288

280289
let mut source_db_ids = source_db_id_set.into_iter().collect::<Vec<u64>>();
@@ -328,6 +337,7 @@ impl<const T: bool> AsyncSystemTable for StreamsTable<T> {
328337
UInt64Type::from_opt_data(table_version),
329338
StringType::from_opt_data(snapshot_location),
330339
StringType::from_data(invalid_reason),
340+
BooleanType::from_data(has_data),
331341
StringType::from_opt_data(owner),
332342
]))
333343
} else {
@@ -375,6 +385,7 @@ impl<const T: bool> StreamsTable<T> {
375385
TableDataType::Nullable(Box::new(TableDataType::String)),
376386
),
377387
TableField::new("invalid_reason", TableDataType::String),
388+
TableField::new("has_data", TableDataType::Boolean),
378389
TableField::new(
379390
"owner",
380391
TableDataType::Nullable(Box::new(TableDataType::String)),

tests/suites/5_ee/05_stream/05_0000_ee_stream.result

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@ true
22
2 INSERT false true
33
test_s db_stream.t append_only
44
test_s1 db_stream.t standard
5-
test_s default default db_stream.t NULL test append_only
6-
test_s1 default default db_stream.t NULL standard standard
7-
test_s default default db_stream.t NULL test append_only
5+
test_s default default db_stream.t NULL test append_only true
6+
test_s1 default default db_stream.t NULL standard standard true
7+
test_s default default db_stream.t NULL test append_only true

tests/suites/5_ee/05_stream/05_0000_ee_stream.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ echo "select a, change\$action, change\$is_update, change\$row_id='$BASE_ROW_ID'
1818

1919
echo "create stream test_s1 on table db_stream.t at(stream => default.test_s) append_only=false comment = 'standard'" | $BENDSQL_CLIENT_CONNECT
2020
echo "show streams like 'test_s%'" | $BENDSQL_CLIENT_CONNECT
21-
echo "show full streams like 'test_s%'" | $BENDSQL_CLIENT_CONNECT | awk '{print $(NF-6), $(NF-5), $(NF-4), $(NF-3), $(NF-2), $(NF-1), $NF}'
22-
echo "desc stream default.test_s" | $BENDSQL_CLIENT_CONNECT | awk '{print $(NF-6), $(NF-5), $(NF-4), $(NF-3), $(NF-2), $(NF-1), $NF}'
21+
echo "show full streams like 'test_s%'" | $BENDSQL_CLIENT_CONNECT | awk '{print $(NF-7), $(NF-6), $(NF-5), $(NF-4), $(NF-3), $(NF-2), $(NF-1), $NF}'
22+
echo "desc stream default.test_s" | $BENDSQL_CLIENT_CONNECT | awk '{print $(NF-7), $(NF-6), $(NF-5), $(NF-4), $(NF-3), $(NF-2), $(NF-1), $NF}'
2323

2424
echo "drop stream if exists default.test_s" | $BENDSQL_CLIENT_CONNECT
2525
echo "drop stream if exists default.test_s1" | $BENDSQL_CLIENT_CONNECT

0 commit comments

Comments
 (0)