Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 111 additions & 22 deletions omnibot/processor.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if event_type == "message" or event_type == "app_mention" looks like it opens up for duplicates

Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,25 @@
import importlib
import json
import re
from typing import Any
from typing import Mapping

import requests

from omnibot import logging
from omnibot import settings
from omnibot.services import slack
from omnibot.services import stats
from omnibot.services.slack import get_bot_info
from omnibot.services.slack import get_message
from omnibot.services.slack import parser
from omnibot.services.slack.base_message import BaseMessage
from omnibot.services.slack.bot import Bot
from omnibot.services.slack.interactive_component import InteractiveComponent
from omnibot.services.slack.message import Message
from omnibot.services.slack.message import MessageUnsupportedError
from omnibot.services.slack.reaction import Reaction
from omnibot.services.slack.reaction import ReactionUnsupportedError
from omnibot.services.slack.slash_command import SlashCommand
from omnibot.services.slack.team import Team
from omnibot.utils import get_callback_id
Expand All @@ -42,30 +49,65 @@ def process_event(event):
)
statsd.incr(f"event.process.attempt.{event_type}")
if event_type == "message" or event_type == "app_mention":
try:
with statsd.timer("process_event"):
logger.debug(
f"Processing message: {json.dumps(event, indent=2)}",
extra=event_trace,
)
try:
message = Message(bot, event_info, event_trace)
_process_message_handlers(message)
except MessageUnsupportedError:
pass
except Exception:
statsd.incr(f"event.process.failed.{event_type}")
logger.exception(
"Could not process message.",
exc_info=True,
extra=event_trace,
)
_process_message_event(bot, event_info, event_trace, event_type)
elif event_type == "reaction_added" or event_type == "reaction_removed":
_process_reaction_event(bot, event_info, event_trace, event_type)
else:
logger.debug("Event is not a message type.", extra=event_trace)
logger.debug("Event is not a message or reaction type.", extra=event_trace)
logger.debug(event)
Comment on lines +56 to 57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why double log?



def _process_message_handlers(message):
def _process_message_event(bot, event_info, event_trace, event_type):
"""
Process message or app_mention events.
"""
statsd = stats.get_statsd_client()
try:
with statsd.timer("process_event"):
logger.debug(
f"Processing event: {json.dumps(event_info, indent=2)}",
extra=event_trace,
)
try:
message = Message(bot, event_info, event_trace)
_process_message_message_handlers(message)
except MessageUnsupportedError:
pass
except Exception:
statsd.incr(f"event.process.failed.{event_type}")
logger.exception(
"Could not process message event.",
exc_info=True,
extra=event_trace,
)


def _process_reaction_event(bot, event_info, event_trace, event_type):
"""
Process reaction_added or reaction_removed events.
"""
statsd = stats.get_statsd_client()
try:
with statsd.timer("process_event"):
logger.debug(
f"Processing event: {json.dumps(event_info, indent=2)}",
extra=event_trace,
)
try:
reaction = Reaction(bot, event_info, event_trace)
_process_reaction_message_handlers(reaction)
except ReactionUnsupportedError:
pass
except Exception:
statsd.incr(f"event.process.failed.{event_type}")
logger.exception(
"Could not process reaction event.",
exc_info=True,
extra=event_trace,
)
Comment on lines +60 to +107
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better as a decorator



def _process_message_message_handlers(message: Message):
bot = message.bot
statsd = stats.get_statsd_client()
command_matched = False
Expand Down Expand Up @@ -101,6 +143,53 @@ def _process_message_handlers(message):
_handle_help(message)


def _process_reaction_message_handlers(reaction: Reaction):
bot = reaction.bot
statsd = stats.get_statsd_client()
handler_called = False
item_channel = reaction.item_channel
item_ts = reaction.item_ts

if not _is_message_from_bot(bot, item_channel, item_ts):
statsd.incr("event.ignored")
return

for handler in bot.message_handlers:
if handler.get("match_type") == "reaction":
match = bool(re.fullmatch(handler["match"], reaction.reaction))
regex_should_not_match = handler.get("regex_type") == "absence"
# A matched regex should callback only if the regex is supposed to
# match. An unmatched regex should callback only if the regex is
# not supposed to match.
if match != regex_should_not_match:
reaction.set_match("reaction", handler["match"])
for callback in handler["callbacks"]:
_handle_message_callback(reaction, callback)
handler_called = True

if handler_called:
statsd.incr("event.handled")
elif not handler_called:
logger.debug("no handler found")
statsd.incr("event.ignored")


def _is_message_from_bot(bot: Bot, channel: str, ts: str):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment why this is necessary vs just looking up bot from lookup table of registered handlers

"""
Some events, like reactions, do not have all the ids we need to determine who wrote the message.
"""
message = get_message(bot, channel, ts)
if not message or "bot_id" not in message:
logger.warning("Failed to retrieve valid message or 'bot_id' is missing")
return False
# There can be multiple bot_ids for the same bot
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any documentation?

bot_info = get_bot_info(bot, message["bot_id"])
if not bot_info or bot_info["app_id"] != bot.bot_id:
logger.debug("Reaction is not on a message from this bot")
return False
return True


def process_slash_command(command):
"""
Dispatcher for slack slash commands.
Expand Down Expand Up @@ -257,7 +346,7 @@ def parse_kwargs(kwargs, bot, event_trace=None):
kwargs[attr] = parser.unextract_users(kwargs[attr], bot)


def _handle_post_message(message, kwargs):
def _handle_post_message(message: BaseMessage, kwargs):
try:
channel = kwargs.pop("channel")
except KeyError:
Expand Down Expand Up @@ -330,7 +419,7 @@ def _handle_action(action, container, kwargs):
)


def _handle_message_callback(message, callback):
def _handle_message_callback(message: BaseMessage, callback: Mapping[str, Any]):
logger.info(
'Handling callback for message: match_type="{}" match="{}"'.format(
message.match_type,
Expand Down
82 changes: 82 additions & 0 deletions omnibot/services/slack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,88 @@ def get_channel(bot, channel):
return {}


def get_message(
bot,
channel: str,
timestamp: str,
):
"""
Get a message, from its channel and timestamp
"""
logger.debug(
"Fetching message",
extra=merge_logging_context(
{"channel": channel, "timestamp": timestamp},
bot.logging_context,
),
)
redis_client = omniredis.get_redis_client()
hash_key = f"message:{bot.team.name}:{channel}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is key by channel or message? If it's by channel why prefixed as message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the hash key is by channel, message just indicates the data type, this pattern is used in other slack functions

message = redis_client.hget(hash_key, timestamp)
if message:
return json.loads(message)
data = client(bot).api_call(
"conversations.history",
channel=channel,
latest=timestamp,
limit=1,
inclusive=True,
include_all_metadata=True,
)
if data.get("ok") and data.get("messages"):
redis_client.hset(hash_key, timestamp, json.dumps(data["messages"][0]))
return data["messages"][0]
else:
logger.warning(
"Call to conversations.history failed.",
extra=merge_logging_context(
{"channel": channel, "timestamp": timestamp},
_get_failure_context(data),
bot.logging_context,
),
)
return None


def get_bot_info(
bot,
bot_id: str,
):
"""
Get bot info, from its bot id
"""
logger.debug(
"Fetching bot info",
extra=merge_logging_context(
{"bot_id": bot_id},
bot.logging_context,
),
)
redis_client = omniredis.get_redis_client()
hash_key = f"bots.info:{bot.team.name}"
info = redis_client.hget(hash_key, bot_id)
if info:
return json.loads(info)

data = client(bot).api_call(
"bots.info",
bot=bot_id,
)
if data.get("ok") and data.get("bot"):
redis_client.hset(hash_key, bot_id, json.dumps(data["bot"]))
return data["bot"]
else:
logger.warning(
"Call to bots.info failed.",
extra=merge_logging_context(
{"bot_id": bot_id},
_get_failure_context(data),
bot.logging_context,
),
)
return None


def _get_channel_name_from_cache(key, bot_name, value):
redis_client = omniredis.get_redis_client()
ret = redis_client.hget(f"{key}:{bot_name}", value)
Expand Down
107 changes: 107 additions & 0 deletions omnibot/services/slack/base_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from omnibot import logging
from omnibot.services import slack
from omnibot.services.slack.bot import Bot

logger = logging.getLogger(__name__)


class BaseMessage:
"""
Base class for representing a parsed slack event message.
"""

def __init__(
self,
bot: Bot,
event: dict,
event_trace: dict,
omnibot_payload_type: str,
):
self._event_trace = event_trace
self.event = event
self._match = None
self._payload = {}
self._payload["omnibot_payload_type"] = omnibot_payload_type
self._bot = bot
# The bot object has data we don't want to pass to downstreams, so
# in the payload, we just store specific bot data.
self._payload["bot"] = {"name": bot.name, "bot_id": bot.bot_id}
# For future safety sake, we'll do the same for the team.
self._payload["team"] = {"name": bot.team.name, "team_id": bot.team.team_id}
self._payload["user"] = event.get("user")
if self.user:
self._payload["parsed_user"] = slack.get_user(self.bot, self.user)
elif self.bot_id:
# TODO: call get_bot
self._payload["parsed_user"] = None
else:
self._payload["parsed_user"] = None

@property
def bot(self):
"""
The bot associated with the app that received this message from the
event subscription api. To get info about a bot that may have sent
this message, see bot_id.
"""
return self._bot

@property
def bot_id(self):
"""
The bot_id associated with the message, if the message if from a bot.
If this message isn't from a bot, this will return None.
"""
return self.event.get("bot_id")

@property
def event_trace(self):
return self._event_trace

@property
def team(self):
return self._payload["team"]

@property
def payload(self):
return self._payload

@property
def user(self):
return self._payload["user"]

@property
def channel_id(self):
return self._payload.get("channel_id")

@property
def channel(self):
return self._payload.get("channel", {})

@property
def ts(self):
"""
The timestamp of the event.
"""
return self._payload["ts"]

@property
def match_type(self):
return self._payload.get("match_type")

@property
def match(self):
return self._match

@property
def command_text(self):
return self._payload.get("command_text")

def set_match(self, match_type: str, match: str):
self._payload["match_type"] = match_type
self._match = match
if match_type == "command":
self._payload["command"] = match
self._payload["args"] = self.command_text[len(match):].strip() # fmt: skip
elif match_type == "regex" or match_type == "reaction":
self._payload["regex"] = match
Loading