Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion scripts/define_routes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import List, Tuple, Type
from src.api import RoutesHandler
from src.bots import PongBot, FlappybirdBot, SkiJumpBot, HappyJumpBot
from src.bots import PongBot, FlappybirdBot, SkiJumpBot, HappyJumpBot, PacManBot
from src.handlers import AiHandler
from src.agents.web_pong import PongAgent
from src.agents.web_flappy_bird import FlappyBirdAgent
Expand Down Expand Up @@ -73,19 +73,26 @@ def define_routes() -> List[Tuple[str, Type, dict]]:
)),
]

pacman_routes = [
(r"/ws/pacman/pacman-bot/", PacManBot)
]

pong_endpoint = (r"/ws/pong/routes/", RoutesHandler, dict(routes=pong_routes))
flappybird_endpoint = (r"/ws/flappybird/routes/", RoutesHandler, dict(routes=flappybird_routes))
skijump_endpoint = (r"/ws/skijump/routes/", RoutesHandler, dict(routes=skijump_routes))
happyjump_endpoint = (r"/ws/happyjump/routes/", RoutesHandler, dict(routes=happyjump_routes))
pacman_endpoint = (r"/ws/pacman/routes/", RoutesHandler, dict(routes=pacman_routes))

routes += pong_routes
routes += flappybird_routes
routes += skijump_routes
routes += happyjump_routes
routes += pacman_routes

routes.append(pong_endpoint)
routes.append(flappybird_endpoint)
routes.append(skijump_endpoint)
routes.append(happyjump_endpoint)
routes.append(pacman_endpoint)

return routes
9 changes: 4 additions & 5 deletions src/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,19 @@ def on_message(self, message):
self.last_message_time = time.time()

game_state = json.loads(message)
self.process_game_state(game_state)
# self.process_game_state(game_state)

move = self.choose_move(game_state)
self.write_message(json.dumps(move))

@abstractmethod
def process_game_state(self, game_state):
pass
# @abstractmethod
# def process_game_state(self, game_state):
# pass

@abstractmethod
def choose_move(self, data: dict) -> dict:
raise NotImplementedError

@abstractmethod
def after_close(self):
pass

Expand Down
113 changes: 113 additions & 0 deletions src/bots.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json

from src.handlers import BaseHandler
import random
import heapq
from collections import deque


class PongBot(BaseHandler):
Expand Down Expand Up @@ -103,3 +106,113 @@ def choose_move(self, data: dict):
jump = 1

return {'jump': jump, 'move': move}


class PacManBot(BaseHandler):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.visited_positions = set()
self.path = []
self.current_target = None
self.recent_positions = deque(maxlen=6)
self.walls_initialized = False
self.last_game_started = False
self.last_level = None

def reset_state(self):
self.visited_positions.clear()
self.path.clear()
self.current_target = None
self.recent_positions.clear()
self.walls_initialized = False

def _check_game_end(self, state):
game_started = state.get('isGameStarted', False)
if self.last_game_started and not game_started:
return True
self.last_game_started = game_started
return False

def choose_move(self, data: dict):
state = data['state']

if self._check_game_end(state):
self.reset_state()

game_map = state['map']
tile_size = state['tileSize']
game_started = state.get('isGameStarted', False)
current_level = state.get('level')

if self.last_level is not None and current_level != self.last_level:
self.reset_state()
self.last_level = current_level

if not game_started:
return {"move": 1}

pacman_x, pacman_y = state['pacmanX'], state['pacmanY']
pacman_row, pacman_col = int(pacman_y // tile_size), int(pacman_x // tile_size)

directions = {1: (0, -1), 2: (0, 1), 3: (-1, 0), 4: (1, 0)}
self.recent_positions.append((pacman_row, pacman_col))

power_mode = state.get('isPowerMode', False)
enemies = [(int(e['y'] // tile_size), int(e['x'] // tile_size)) for e in state.get('enemies', [])]

if not self.walls_initialized:
for r, row in enumerate(game_map):
for c, tile in enumerate(row):
if tile == 1:
self.visited_positions.add((r, c))
self.walls_initialized = True

self.visited_positions.add((pacman_row, pacman_col))

def is_walkable(r, c):
return 0 <= r < len(game_map) and 0 <= c < len(game_map[0]) and game_map[r][c] != 1

def is_safe(r, c):
if not is_walkable(r, c):
return False
if power_mode:
return True
return all(abs(er - r) + abs(ec - c) > 1.5 for er, ec in enemies)

def dijkstra_find_path(sr, sc):
pq = [(0, sr, sc, [])]
visited = set()
while pq:
cost, r, c, path = heapq.heappop(pq)
if (r, c) in visited:
continue
visited.add((r, c))
if (r, c) not in self.visited_positions and is_safe(r, c):
self.current_target = (r, c)
return path
for move, (dr, dc) in directions.items():
nr, nc = r + dr, c + dc
if (nr, nc) not in visited and is_walkable(nr, nc) and is_safe(nr, nc):
heapq.heappush(pq, (cost + 1, nr, nc, path + [move]))
return []

if len(self.recent_positions) >= 3:
last3 = list(self.recent_positions)[-3:]
if last3.count(last3[0]) == 3:
self.path.clear()
self.current_target = None

if self.current_target is None or self.current_target == (pacman_row, pacman_col) or not self.path:
self.current_target = None
self.path = dijkstra_find_path(pacman_row, pacman_col)

if self.path:
move = self.path.pop(0)
return {'move': move}

possible_moves = [m for m, (dr, dc) in directions.items() if is_safe(pacman_row + dr, pacman_col + dc)]
if possible_moves:
return {'move': random.choice(possible_moves)}

return {'move': 0}
3 changes: 2 additions & 1 deletion src/handlers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from src.api import BaseHandler
from src.agents.web_env import WebsocketAgent
from collections import deque
from typing import Callable
from typing import Callable, final

import numpy as np
import json
Expand All @@ -15,6 +15,7 @@ def initialize(
):
self.agent = agent

@final
def after_close(self):
self.agent.states.clear()

Expand Down