A distributed task execution system implemented in Java using Apache ZooKeeper for master-worker coordination, featuring event-driven task queue management, fault-tolerant worker load balancing, and multi-threaded middleware support for TCP/IP and RMI protocols.
This framework implements a distributed computing system where tasks are submitted by clients, coordinated by a master node, and executed by worker nodes. The system uses Apache ZooKeeper for distributed coordination, ensuring fault tolerance, service discovery, and efficient task distribution across multiple worker nodes.
- Master-Worker Architecture: Automatic master election using ZooKeeper ephemeral znodes
- Event-Driven Task Management: Real-time task assignment using ZooKeeper watchers
- Fault Tolerance: Automatic worker failure detection and task reassignment
- Load Balancing: Intelligent task distribution across idle workers
- Multi-Protocol Support: Middleware server supporting both TCP/IP and RMI protocols
- Service Discovery: Dynamic worker registration and discovery using ephemeral znodes
- Automated Deployment: Shell scripts for ZooKeeper cluster setup and management
ββββββββββββ ββββββββββββββββ ββββββββββββ
β Client ββββββββββΊβ Master ββββββββββΊβ Worker β
β β β (ZooKeeper) β β β
ββββββββββββ ββββββββββββββββ ββββββββββββ
β
βΌ
ββββββββββββββββ
β ZooKeeper β
β Cluster β
ββββββββββββββββ
The system uses the following ZooKeeper znode hierarchy:
/dist27/
βββ /master # Master election node
βββ /tasks/ # Task submission queue
β βββ task-0000000001 # Sequential task nodes
β βββ task-0000000002
β βββ ...
βββ /workers/ # Worker registration
β βββ worker-id-1 # Ephemeral worker nodes
β βββ worker-id-2
β βββ ...
βββ /idle/ # Idle worker tracking
βββ worker-id-1 # Ephemeral idle nodes
βββ ...
-
Master Node:
- First process to successfully create
/masterznode - Monitors
/tasksfor new task submissions - Tracks idle workers via
/idleznodes - Assigns tasks to available workers
- First process to successfully create
-
Worker Nodes:
- Register themselves under
/workerswith unique IDs - Create ephemeral
/idlenodes when available - Watch for task assignments
- Execute tasks and write results to
/tasks/{task-id}/result
- Register themselves under
-
Client:
- Submits tasks by creating sequential nodes under
/tasks - Retrieves results from
/tasks/{task-id}/result
- Submits tasks by creating sequential nodes under
- Language: Java (80.3%)
- Coordination: Apache ZooKeeper 3.6.2
- Protocols: TCP/IP, RMI (Java Remote Method Invocation)
- Build: Shell scripts for compilation and deployment
- Infrastructure: Multi-server ZooKeeper cluster
- Java Development Kit (JDK) 8 or higher
- Apache ZooKeeper 3.6.2 or compatible version
- Multiple servers for ZooKeeper cluster (minimum 3 for production)
- Network connectivity between all nodes
-
Clone the repository
git clone https://github.com/priyavrat7/Distributed-Task-Execution-Framework.git cd Distributed-Task-Execution-Framework -
Set up ZooKeeper cluster
Update
available_servers.txtwith your server addresses:server.1=hostname1:22299:22399 server.2=hostname2:22299:22399 server.3=hostname3:22299:22399 -
Configure ZooKeeper
Copy and customize the ZooKeeper configuration:
cp zoo-base.cfg zoo.cfg # Edit zoo.cfg with your server addresses -
Initialize ZooKeeper data directories
./server-setup.sh
-
Start ZooKeeper cluster
./server-start.sh
-
Compile the Java code
./compile.sh
-
Start Master/Worker nodes
# On each node, run the Java application # The first node to start becomes the master java -cp <classpath> MainClass
-
Submit tasks via client
./client-create-nodes.sh
-
Stop the system
./server-stop.sh
The system automatically handles master election:
- Each node attempts to create the
/masterznode - The first successful node becomes the master
- Other nodes become workers
- If master fails, workers can re-elect a new master
Tasks are submitted by creating sequential znodes under /tasks:
- Each task contains serialized task data
- Master monitors new tasks via watchers
- Tasks are assigned to idle workers automatically
- Workers register with unique IDs (format:
PORT@LAB) - Idle workers create ephemeral nodes under
/idle - Master assigns tasks by updating worker znode data
- Worker deletion of idle node triggers task execution
- Results are written to
/tasks/{task-id}/result
Edit zoo-base.cfg or create zoo.cfg:
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
server.1=hostname1:22299:22399
server.2=hostname2:22299:22399
server.3=hostname3:22299:22399Update available_servers.txt with your cluster servers:
server.1-lab2-25.cs.mcgill.ca:22299:22399
server.2-lab2-26.cs.mcgill.ca:22299:22399
server.3-lab2-29.cs.mcgill.ca:22299:22399
Distributed-Task-Execution-Framework/
βββ zk/ # ZooKeeper related files
βββ compile.sh # Compilation script
βββ server-setup.sh # ZooKeeper cluster setup
βββ server-start.sh # Start ZooKeeper servers
βββ server-stop.sh # Stop ZooKeeper servers
βββ server-availability.sh # Check server availability
βββ client-create-nodes.sh # Client task submission script
βββ available_servers.txt # Server configuration
βββ zoo-base.cfg # Base ZooKeeper configuration
βββ zoo-base-source.cfg # Source ZooKeeper configuration
βββ README.md # This file
βββ [Java source files] # Main implementation
- Master Failure: Automatic re-election when master node fails
- Worker Failure: Ephemeral znodes automatically removed on worker failure
- Task Recovery: Master can reassign tasks from failed workers
- Network Partitions: ZooKeeper handles split-brain scenarios
- Start ZooKeeper cluster on multiple servers
- Launch multiple worker nodes across different machines
- Submit tasks using the client script
- Monitor task execution via ZooKeeper CLI or Java API
- Test fault tolerance by killing master or worker nodes
// Pseudo-code
onTaskCreated() {
taskQueue.add(newTask);
assignTasksToIdleWorkers();
}
onIdleWorkerAvailable() {
idleWorkerCache.update();
assignTasksToIdleWorkers();
}
assignTasksToIdleWorkers() {
while (!taskQueue.isEmpty() && !idleWorkerCache.isEmpty()) {
task = taskQueue.poll();
worker = idleWorkerCache.get();
assignTask(worker, task);
deleteIdleNode(worker);
}
}// Pseudo-code
onIdleNodeDeleted() {
taskId = getTaskIdFromWorkerNode();
task = getTaskData(taskId);
result = executeTask(task);
writeResult(taskId, result);
recreateIdleNode();
}This project was developed as part of COMP512 (Distributed Systems) coursework at McGill University.
Contributors:
- Priyavrat Dev Sharma
This project is part of academic coursework. Please refer to the license file for details.
- McGill University, Department of Computer Science
- Apache ZooKeeper project for excellent distributed coordination framework
- Repository: https://github.com/priyavrat7/Distributed-Task-Execution-Framework
- Issues: Report a bug or request a feature
Note: This framework is designed for educational and research purposes. For production use, ensure proper security configurations and error handling.