Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion scripts/define_routes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import List, Tuple, Type
from src.api import RoutesHandler
from src.bots import PongBot, FlappybirdBot, SkiJumpBot, HappyJumpBot
from src.bots import PongBot, FlappybirdBot, SkiJumpBot, HappyJumpBot, PacManBot
from src.handlers import AiHandler
from src.agents.web_pong import PongAgent
from src.agents.web_flappy_bird import FlappyBirdAgent
Expand Down Expand Up @@ -73,19 +73,26 @@ def define_routes() -> List[Tuple[str, Type, dict]]:
)),
]

pacman_routes = [
(r"/ws/pacman/pacman-bot/", PacManBot)
]

pong_endpoint = (r"/ws/pong/routes/", RoutesHandler, dict(routes=pong_routes))
flappybird_endpoint = (r"/ws/flappybird/routes/", RoutesHandler, dict(routes=flappybird_routes))
skijump_endpoint = (r"/ws/skijump/routes/", RoutesHandler, dict(routes=skijump_routes))
happyjump_endpoint = (r"/ws/happyjump/routes/", RoutesHandler, dict(routes=happyjump_routes))
pacman_endpoint = (r"/ws/pacman/routes/", RoutesHandler, dict(routes=pacman_routes))

routes += pong_routes
routes += flappybird_routes
routes += skijump_routes
routes += happyjump_routes
routes += pacman_routes

routes.append(pong_endpoint)
routes.append(flappybird_endpoint)
routes.append(skijump_endpoint)
routes.append(happyjump_endpoint)
routes.append(pacman_endpoint)

return routes
9 changes: 4 additions & 5 deletions src/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,19 @@ def on_message(self, message):
self.last_message_time = time.time()

game_state = json.loads(message)
self.process_game_state(game_state)
# self.process_game_state(game_state)

move = self.choose_move(game_state)
self.write_message(json.dumps(move))

@abstractmethod
def process_game_state(self, game_state):
pass
# @abstractmethod
# def process_game_state(self, game_state):
# pass

@abstractmethod
def choose_move(self, data: dict) -> dict:
raise NotImplementedError

@abstractmethod
def after_close(self):
pass

Expand Down
130 changes: 128 additions & 2 deletions src/bots.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json

from src.handlers import BaseHandler
import random
import heapq
from collections import deque


class PongBot(BaseHandler):
Expand Down Expand Up @@ -103,3 +104,128 @@ def choose_move(self, data: dict):
jump = 1

return {'jump': jump, 'move': move}


class PacManBot(BaseHandler):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.visited_positions = set()
self.path = []
self.current_target = None
self.recent_positions = deque(maxlen=6)
self.walls_initialized = False
self.last_game_started = False
self.last_level = None
self.safe_distance = 1.75
self.game_map = None
self.is_power_mode_on = False
self.enemies = None
self.directions = {1: (0, -1), 2: (0, 1), 3: (-1, 0), 4: (1, 0)}

def reset_state(self) -> None:
self.visited_positions.clear()
self.path.clear()
self.current_target = None
self.recent_positions.clear()
self.walls_initialized = False
self.game_map = None
self.is_power_mode_on = False
self.enemies = None

def _check_game_end(self, state: dict) -> bool:
game_started = state.get('isGameStarted', False)
if self.last_game_started and not game_started:
return True
self.last_game_started = game_started
return False

def is_walkable(self, walkable_row: int, walkable_column: int) -> bool:
return (0 <= walkable_row < len(self.game_map) and 0 <= walkable_column < len(self.game_map[0])
and self.game_map[walkable_row][walkable_column] != 1)

def is_safe(self, safe_row: int, safe_column: int, enemies: list) -> bool:
if not self.is_walkable(safe_row, safe_column):
return False
if self.is_power_mode_on:
return True
return all(abs(enemy_row - safe_row) + abs(enemy_column - safe_column) > self.safe_distance
for enemy_row, enemy_column in enemies)

def dijkstra_find_path(self, start_row: int, start_column: int) -> list:
pq = [(0, start_row, start_column, [])]
visited = set()
while pq:
cost, dij_row, dij_column, path = heapq.heappop(pq)
if (dij_row, dij_column) in visited:
continue
visited.add((dij_row, dij_column))
if ((dij_row, dij_column) not in self.visited_positions
and self.is_safe(dij_row, dij_column, self.enemies)):
self.current_target = (dij_row, dij_column)
return path
for dij_move, (direction_row, direction_column) in self.directions.items():
next_row, next_column = dij_row + direction_row, dij_column + direction_column
if ((next_row, next_column) not in visited and
self.is_walkable(next_row, next_column)
and self.is_safe(next_row, next_column, self.enemies)):
heapq.heappush(pq, (cost + 1, next_row, next_column, path + [dij_move]))
return []

def choose_move(self, data: dict):
state = data['state']

if self._check_game_end(state):
self.reset_state()

self.game_map = state['map']
tile_size = state['tileSize']
game_started = state.get('isGameStarted', False)
current_level = state.get('level')

if self.last_level is not None and current_level != self.last_level:
self.reset_state()
self.last_level = current_level

if not game_started:
return {"move": 1}

pacman_x, pacman_y = state['pacmanX'], state['pacmanY']
pacman_row, pacman_col = int(pacman_y // tile_size), int(pacman_x // tile_size)

self.recent_positions.append((pacman_row, pacman_col))

self.is_power_mode_on = state.get('isPowerMode', False)
self.enemies = [(int(enemy['y'] // tile_size), int(enemy['x'] // tile_size))
for enemy in state.get('enemies', [])]

if not self.walls_initialized:
for r, row in enumerate(self.game_map):
for c, tile in enumerate(row):
if tile == 1:
self.visited_positions.add((r, c))
self.walls_initialized = True

self.visited_positions.add((pacman_row, pacman_col))

if len(self.recent_positions) >= 3:
last3 = list(self.recent_positions)[-3:]
if last3.count(last3[0]) == 3:
self.path.clear()
self.current_target = None

if self.current_target is None or self.current_target == (pacman_row, pacman_col) or not self.path:
self.current_target = None
self.path = self.dijkstra_find_path(pacman_row, pacman_col)

if self.path:
move = self.path.pop(0)
return {'move': move}

possible_moves = [possible_move for possible_move, (direction_row, direction_column)
in self.directions.items() if self.is_safe(pacman_row + direction_row,
pacman_col + direction_column, self.enemies)]
if possible_moves:
return {'move': random.choice(possible_moves)}

return {'move': 0}
3 changes: 2 additions & 1 deletion src/handlers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from src.api import BaseHandler
from src.agents.web_env import WebsocketAgent
from collections import deque
from typing import Callable
from typing import Callable, final

import numpy as np
import json
Expand All @@ -15,6 +15,7 @@ def initialize(
):
self.agent = agent

@final
def after_close(self):
self.agent.states.clear()

Expand Down