Skip to content

Commit 13fae3d

Browse files
committed
added spark_processing and sending data to cassandra
1 parent 590f681 commit 13fae3d

7 files changed

+45
-48
lines changed

.gitignore

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
.gitignore
22
random-name-api-venv/
3+
dags/__pycache__
34
logs/
5+
plugins/
46
config/
5-
dags/__pycache__/kafka_stream.cpython-38.pyc
7+
spark/
8+
cassandra/

Dockerfile

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
FROM apache/airflow:2.9.0-python3.11
2+
COPY requirements.txt /
23

34
USER root
4-
RUN sudo apt-get update && sudo apt-get install -y gcc libpq-dev
5+
RUN sudo apt-get update && sudo apt-get install -y gcc libpq-dev \
6+
openjdk-17-jre-headless
57

68
USER airflow
7-
ADD requirements.txt .
8-
RUN pip install -r requirements.txt
9-
9+
ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
10+
RUN pip3 install "apache-airflow==2.9.0" apache-airflow-providers-apache-spark==4.7.1
11+
RUN pip3 install --no-cache-dir -r /requirements.txt
1012

1113

1214

dags/dags.py

-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
from airflow import DAG
22
from airflow.operators.python_operator import PythonOperator
3-
from airflow.providers.apache.spark.operators \
4-
.spark_submit import SparkSubmitOperator
53

64
from kafka_stream import stream_data
75
from datetime import datetime
@@ -22,9 +20,4 @@
2220
python_callable=stream_data,
2321
dag=dag)
2422

25-
processing_task = SparkSubmitOperator(
26-
task_id='process_data',
27-
application='/opt/airflow/dags/spark_processing.py',
28-
dag=dag)
29-
3023
streaming_task

dags/kafka_stream.py

+7-13
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,21 @@
1-
from kafka import KafkaProducer, KafkaConsumer
1+
from kafka import KafkaProducer
22

33
import json
44
import requests
55

66

77
def create_kafka_producer():
8-
9-
producer = KafkaProducer(bootstrap_servers=['kafka:29092'],
8+
producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
109
value_serializer=lambda x: json.dumps(x)
1110
.encode('utf-8'))
1211
return producer
1312

1413

15-
def create_kafka_consumer(topic):
16-
consumer = KafkaConsumer(topic, bootstrap_servers=['kafka:29092'],
17-
auto_offset_reset='earliest',
18-
value_deserializer=lambda x: json
19-
.loads(x.decode('utf-8')))
20-
return consumer
21-
22-
2314
def get_data():
2415

2516
res = requests.get('https://randomuser.me/api')
2617
res = res.json()
27-
res = res['res'][0]
18+
res = res['results'][0]
2819

2920
return res
3021

@@ -39,7 +30,7 @@ def format_data(res):
3930
{res['location']['street']['name']}"
4031
data["city"] = res['location']['city']
4132
data["country"] = res['location']['country']
42-
data["postcode"] = int(res['location']['postcode'])
33+
data["postcode"] = res['location']['postcode']
4334
data["latitude"] = float(res['location']['coordinates']['latitude'])
4435
data["longitude"] = float(res['location']['coordinates']['longitude'])
4536
data["email"] = res["email"]
@@ -55,3 +46,6 @@ def stream_data():
5546
data = format_data(res)
5647
producer.send(topic, value=data)
5748
producer.flush()
49+
50+
51+
stream_data()

dags/spark_processing.py

+16-16
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,13 @@
1313
def create_spark_session():
1414
try:
1515
spark = SparkSession \
16-
.builder \
17-
.appName('SparkStructuredStreaming') \
18-
.config("spark.jars.packages",
19-
"com.datastax.spark:spark-cassandra-connector_2.12:3.4.0, \
20-
org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.2") \
21-
.config("spark.cassandra.connection.auth.username", "cassandra") \
22-
.config("spark.cassandra.connection.auth.password", "cassandra") \
23-
.config("spark.cassandra.connection.port", 9042) \
24-
.getOrCreate()
16+
.builder \
17+
.appName('SparkStructuredStreaming') \
18+
.config('spark.cassandra.connection.host', 'cassandra') \
19+
.config("spark.cassandra.connection.port", 9042) \
20+
.config("spark.cassandra.auth.username", "cassandra") \
21+
.config("spark.cassandra.auth.password", "cassandra") \
22+
.getOrCreate()
2523
spark.sparkContext.setLogLevel("ERROR")
2624
logging.info('Spark session created successfully')
2725
except Exception:
@@ -36,7 +34,7 @@ def create_initial_dataframe(spark_session):
3634
df = spark_session \
3735
.readStream \
3836
.format("kafka") \
39-
.option("kafka.bootstrap.servers", "kafka:29092") \
37+
.option("kafka.bootstrap.servers", "kafka:9092") \
4038
.option("subscribe", "user_data") \
4139
.option("delimeter", ",") \
4240
.option("startingOffsets", "earliest") \
@@ -50,9 +48,6 @@ def create_initial_dataframe(spark_session):
5048

5149

5250
def create_final_dataframe(df, spark_session):
53-
"""
54-
Modifies the initial dataframe, and creates the final dataframe.
55-
"""
5651
schema = StructType([
5752
StructField("full_name", StringType(), False),
5853
StructField("gender", StringType(), False),
@@ -68,15 +63,20 @@ def create_final_dataframe(df, spark_session):
6863
df = df.selectExpr("CAST(value AS STRING)") \
6964
.select(from_json(col("value"), schema).alias("data")) \
7065
.select("data.*")
71-
print(df)
66+
return df
7267

7368

7469
def start_streaming(df):
7570
logging.info("Streaming started")
7671
my_query = (df.writeStream
77-
.format("com.apache.spark.sql.cassandra")
72+
.format("org.apache.spark.sql.cassandra")
7873
.outputMode("append")
79-
.options(table="random_names", keyspace="spark_streaming"))
74+
.option("checkpointLocation", "/opt/spark/checkpoint")
75+
.options(table="random_names", keyspace="spark_streaming")
76+
.option("kafka.request.timeout.ms", "30000")
77+
.option("kafka.retry.backoff.ms", "500")
78+
.option("kafka.session.timeout.ms", "60000")
79+
.start())
8080

8181
return my_query.awaitTermination()
8282

docker-compose.yaml

+11-6
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,8 @@ services:
250250
- airflow-network
251251
ports:
252252
- "2181:2181"
253+
volumes:
254+
- zookeeper-data:/data
253255
environment:
254256
ZOOKEEPER_CLIENT_PORT: 2181
255257
ZOOKEEPER_TICK_TIME: 2000
@@ -268,18 +270,20 @@ services:
268270
KAFKA_BROKER_ID: 1
269271
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
270272
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
271-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
273+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
274+
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092, PLAINTEXT_HOST://0.0.0.0:29092
272275
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
273276

274277
spark:
275278
image: apache/spark:3.4.2-python3
276-
container_name: spark_master
279+
container_name: spark
277280
ports:
278-
- 8085:8080
281+
- 8085:8085
279282
working_dir: /opt/spark
280-
command: /opt/spark/sbin/start-master.sh
283+
command: >
284+
/opt/spark/sbin/start-master.sh
281285
environment:
282-
SPARK_UI_PORT: 8080
286+
SPARK_UI_PORT: 8085
283287
SPARK_MODE: master
284288
SPARK_RPC_AUTHENTICATION_ENABLED: no
285289
SPARK_RPC_ENCRYPTION_ENABLED: no
@@ -316,7 +320,8 @@ volumes:
316320
name: cassandra-data
317321
spark-data:
318322
name: spark-data
319-
323+
zookeeper-data:
324+
name: zookeeper-data
320325
networks:
321326
airflow-network:
322327
name: airflow-network

requirements.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
psycopg2
22
kafka-python
3-
apache-airflow==2.9.0
3+
pyspark==3.4.2

0 commit comments

Comments
 (0)