Skip to content

Commit a996791

Browse files
committed
[refactor] update iceberg poc
1 parent e8ea06f commit a996791

File tree

4 files changed

+58
-4
lines changed

4 files changed

+58
-4
lines changed

docker/iceberg/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
```sh
44
$ git clone git@github.com:tabular-io/docker-spark-iceberg.git
5+
$ cd docker-spark-iceberg
6+
$ docker-compose up -d
57
$ docker exec -it spark-iceberg pyspark
68
```
79

docker/iceberg/weather-table.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# Creating table
2+
# %%sql
3+
# CREATE DATABASE IF NOT EXISTS climate;
4+
spark.sql("CREATE DATABASE IF NOT EXISTS climate")
5+
6+
# %%sql
7+
# CREATE TABLE IF NOT EXISTS climate.weather (
8+
# datetime timestamp,
9+
# temp double,
10+
# lat double,
11+
# long double,
12+
# cloud_coverage string,
13+
# precip double,
14+
# wind_speed double
15+
# )
16+
# USING iceberg
17+
# PARTITIONED BY (days(datetime))
18+
spark.sql("CREATE TABLE IF NOT EXISTS climate.weather ( \
19+
datetime timestamp, \
20+
temp double, \
21+
lat double, \
22+
long double, \
23+
cloud_coverage string, \
24+
precip double, \
25+
wind_speed double \
26+
) \
27+
USING iceberg \
28+
PARTITIONED BY (days(datetime))")
29+
30+
# Writing data
31+
from datetime import datetime
32+
schema = spark.table("climate.weather").schema
33+
data = [
34+
(datetime(2023,8,16), 76.2, 40.951908, -74.075272, "Partially sunny", 0.0, 3.5),
35+
(datetime(2023,8,17), 82.5, 40.951908, -74.075272, "Sunny", 0.0, 1.2),
36+
(datetime(2023,8,18), 70.9, 40.951908, -74.075272, "Cloudy", .5, 5.2)
37+
]
38+
df = spark.createDataFrame(data, schema)
39+
df.writeTo("climate.weather").append()
40+
41+
42+
# Reading data
43+
from pyiceberg.catalog import load_catalog
44+
from pyiceberg.expressions import GreaterThanOrEqual
45+
46+
catalog = load_catalog("default")
47+
tbl = catalog.load_table("climate.weather")
48+
49+
sc = tbl.scan(row_filter=GreaterThanOrEqual("datetime", "2023-08-01T00:00:00.000000+00:00"))
50+
df = sc.to_arrow().to_pandas()

k8s/risingwave/cmd/kinesisToIceberg.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ import (
1010
)
1111

1212
const (
13-
connStr string = "postgres://root@localhost:4567/dev"
14-
sqlFile string = "./sql/kinesis-to-iceberg.sql"
13+
connStr string = "postgres://root@localhost:4567/dev"
14+
sqlFile string = "./sql/kinesis-to-iceberg.sql"
15+
sourceJobName string = "create-kinesis-source"
16+
sinkJobName string = "create-iceberg-sink"
1517
)
1618

1719
func main() {
@@ -26,13 +28,13 @@ func main() {
2628
log.Fatalf("Error loading sql: %v\n", err)
2729
}
2830
// Source
29-
_, err = dot.Exec(db, "create-kinesis-source")
31+
_, err = dot.Exec(db, sourceJobName)
3032
if err != nil {
3133
log.Fatalf("Error executing create table: %v\n", err)
3234
}
3335

3436
// Sink
35-
_, err = dot.Exec(db, "create-iceberg-sink")
37+
_, err = dot.Exec(db, sinkJobName)
3638
if err != nil {
3739
log.Fatalf("Error executing create sink: %v\n", err)
3840
}

0 commit comments

Comments
 (0)