Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d386455
ENH: Add possibility to delay generating MISP Feed
kamil-certat Jun 27, 2024
d67d6f3
Add documentation. Fix code compatibility
kamil-certat Jul 3, 2024
920a07d
Fix spelling
kamil-certat Jul 3, 2024
59c3014
ENH: Add attribute mapping
kamil-certat Jul 4, 2024
b8b6061
ENH: Add support for creating separated MISP Events
kamil-certat Jul 4, 2024
baef444
FIX: Handle not existing fields with manual mapping
kamil-certat Jul 8, 2024
cff9efe
ENH: Add option to extend default info
kamil-certat Jul 9, 2024
f6869ab
Fix typos
kamil-certat Jul 9, 2024
bd7e0d1
ENH: add support for tagging
kamil-certat Jul 10, 2024
820bdec
Fix generating on restart
kamil-certat Jul 10, 2024
d05aceb
ENH: Add tagging, check, and improved docs
kamil-certat Jul 16, 2024
26f161f
DOC: Update documentation about CacheMixin
kamil-certat Jul 16, 2024
93790df
Adjust to pycodestyle
kamil-certat Jul 16, 2024
1a980e0
Fix typo
kamil-certat Jul 16, 2024
6577a90
Clean up imports in tests
kamil-certat Jul 16, 2024
0c1dcf1
Add option for flat structure
kamil-certat Oct 27, 2025
352c487
Add changing attribute type
kamil-certat Oct 27, 2025
6c5317e
Fix positional arguments
kamil-certat Oct 27, 2025
fbfdecc
Improved documentation, renamed parameters, lower-level cache API
kamil-certat Oct 28, 2025
370f3cc
Regenerate only modified events
kamil-certat Oct 23, 2025
d86b1e3
Update Changelog and fixes in docs
kamil-certat Oct 28, 2025
2ab3da3
Spelling & pycodestyle
kamil-certat Oct 28, 2025
388609a
Correct and extend test, automatically adjust reload delay, docs fixes
kamil-certat Oct 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions docs/dev/bot-development.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,37 @@ and provides the methods to cache key-value pairs:

and following methods to cache objects in a queue:

- `cache_put`
- `cache_pop`
- `cache_length`.
- `cache_lpush`
- `cache_rpop`
- `cache_llen`.

Caching key-value pairs and queue caching are two separated mechanisms. The first is designed
for arbitrary values, the second one is focused on temporary storing messages (but can handle other
data). You won't see caches from one in the another. For example, if adding a key-value pair using
`cache_set`, it does not change the value from `cache_length`, and if adding an element using
`cache_put` you cannot use `check_exists` to look for it.
`cache_set`, it does not change the value from `cache_llen`, and if adding an element using
`cache_lpush` you cannot use `check_exists` to look for it.

When using queue-based caching, you have to serialize object to a format accepted by Redis/Valkey
as the underlying storage. For example, to store a message in a queue using bot ID as key, you can
use code like:

```python
msmessage = self.receive_message().to_dict(jsondict_as_string=True)
self.cache_lpush(self.bot_id, json.dumps(message))
```

and to retrieve a message from the cache:

```python
data = self.cache_pop()
if data is None:
return # handle empty cache
message = json.loads(data)
# to use it as Message object
message_obj = MessageFactory.from_dict(
message, harmonization=self.harmonization, default_type="Event"
)
```

### Pipeline Interactions

Expand Down
57 changes: 50 additions & 7 deletions docs/user/bots.md
Original file line number Diff line number Diff line change
Expand Up @@ -4772,6 +4772,7 @@ hour", string.
incoming messages until the given number of them. Use it if your bot proceeds a high number of messages
and constant saving to the disk is a problem. Reloading or restarting bot as well as generating
a new MISP event based on `interval_event` triggers regenerating MISP feed regardless of the cache size.
To ensure saving on reload without any delay, you should also set `_sighup_delay` parameter.

**`attribute_mapping`**

Expand All @@ -4790,7 +4791,7 @@ For example:

```yaml
attribute_mapping:
source.ip:
source.ip: {}
feed.name:
comment: event_description.text
destination.ip:
Expand All @@ -4800,9 +4801,9 @@ attribute_mapping:
would create a MISP object with three attributes `source.ip`, `feed.name` and `destination.ip`
and set their values as in the IntelMQ event. In addition, the `feed.name` would have a comment
as given in the `event_description.text` from IntelMQ event, and `destination.ip` would be set
as not usable for IDS.
as not usable for IDS. You can use `type` key to overwrite the attribute type.

**`event_separator`
**`grouping_key`

(optional, string): If set to a field name from IntelMQ event, the bot will work in parallel on a few
events instead of saving all incoming messages to a one. Each unique value from the field will
Expand All @@ -4814,17 +4815,17 @@ use the same MISP Event as long as it's allowed by the `interval_event`.

(optional, string): If set, the generated MISP Event will use it in the `info` field of the event,
in addition to the standard IntelMQ description with the time frame (you cannot remove it as the bot
depends of datetimes saved there). If you use `event_separator`, you may want to use `{separator}`
placeholder which will be then replaced with the value of the separator.
depends of datetimes saved there). If you use `grouping_key`, you may want to use `{key}`
placeholder which will be then replaced with the value of the grouping key.

For example, the following configuration can be used to create MISP Feed with IPs of C2 servers
of different botnets, having each botnet in a separated MISP Events with an appropriate description.
Each MISP Event will contain objects with the `source.ip` field only, and the events' info will look
like *C2 Servers for botnet-1. IntelMQ event 2024-07-09T14:51:10.825123 - 2024-07-10T14:51:10.825123*

```yaml
event_separator: malware.name
additional_info: C2 Servers for {separator}.
grouping_key: malware.name
additional_info: C2 Servers for {key}.
attribute_mapping:
source.ip:
```
Expand Down Expand Up @@ -4863,6 +4864,48 @@ tagging:
- name: njrat
```

** `flat_events`

(optional, bool): instead of creating an object for every incomming IntelMQ message, it will add
attributes directly to the MISP event. Useful if your want to export just a list of data, e.g.
C2 domains, without having to group some attributes together. By default set to `False`.

**Example**

For example, if you have a source that sends C2 domains for multiple malware families,
you can use the following bot's configuration:

```yaml
parameters:
destination_queues: {}
# you have to configure your webserver to expose this path for MISP
output_dir: "/var/lib/intelmq/bots/your_feed/"
misp_org_name: My Organisation
misp_org_uuid: Your-Org-UUID
interval_event: 1 day
grouping_key: "malware.name"
bulk_save_count: 100
additional_info: "{key} - "
flat_events: true
attribute_mapping:
source.fqdn:
comment: malware.name
type: domain
category: "Network activity"
to_ids: true
tagging:
__all__:
- name: tlp:amber
# ensure saving on reload
_sighup_delay: false
```

As a result, you will get MISP feed that creates one event per malware family every day. In the event,
there will be just C2 domains with the IDS flag set and the malware name as comment. In addition, all
events will be tagged with `tlp:amber` and also have the malware name in the comment, together with
the information about the time period. The MISP Feed will be saved to disk after accumulating 100 C2
domains or on reload/restart.

**Usage in MISP**

Configure the destination directory of this feed as feed in MISP, either as local location, or served via a web server.
Expand Down
42 changes: 22 additions & 20 deletions intelmq/bots/outputs/misp/output_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ class MISPFeedOutputBot(OutputBot, CacheMixin):
additional_info: str = None

# An optional field used to create multiple MISP events from incoming messages
event_separator: str = None
grouping_key: str = None

# Optional non-standard mapping of message fields to MISP object attributes
# You can overwrite the attribute type by providing the 'type' argument
# The structure is like:
# {<IDF_field_name = MIPS_object_relation>: {<dict of additional parameters for MISPObjectAttribute>}}
# For example:
Expand All @@ -60,7 +61,7 @@ class MISPFeedOutputBot(OutputBot, CacheMixin):
attribute_mapping: dict = None

# Optional definition to add tags to the MISP event. It should be a dict where keys are
# '__all__' (to add tags for every event) or, if the event_separator is used, the separator
# '__all__' (to add tags for every event) or, if the grouping_key is used, the key
# values. For each key, there should be a list of dicts defining parameters for the MISPTag
# object, but only the "name" is required to set.
# For example:
Expand Down Expand Up @@ -114,7 +115,7 @@ def init(self):
with (self.output_dir / ".current").open() as f:
current = f.read()

if not self.event_separator:
if not self.grouping_key:
self.current_files[DEFAULT_KEY] = Path(current)
else:
self.current_files = {
Expand Down Expand Up @@ -146,9 +147,9 @@ def init(self):
self._tagging_objects[key].append(tag)

# Ensure we do generate feed on reload / restart, so awaiting messages won't wait forever
if self.cache_length() and not getattr(self, "testing", False):
if length := self.cache_llen(self.bot_id) and not getattr(self, "testing", False):
self.logger.debug(
"Found %s awaiting messages. Generating feed.", self.cache_length()
"Found %s awaiting messages. Generating feed.", length
)
self._generate_misp_feed()

Expand Down Expand Up @@ -181,7 +182,7 @@ def process(self):

cache_size = None
if self.bulk_save_count:
cache_size = self.cache_put(event)
cache_size = self.cache_lpush(self.bot_id, json.dumps(event))

if cache_size is None:
self._generate_misp_feed(event)
Expand All @@ -208,7 +209,7 @@ def _generate_new_misp_event(self, key):
end=self.max_time_current.isoformat(),
)
if self.additional_info:
info = f"{self.additional_info.format(separator=key)} {info}"
info = f"{self.additional_info.format(key=key)} {info}"

self.current_events[key].info = info
self.current_events[key].set_date(datetime.date.today())
Expand All @@ -218,7 +219,7 @@ def _generate_new_misp_event(self, key):
self.output_dir / f"{self.current_events[key].uuid}.json"
)
with (self.output_dir / ".current").open("w") as f:
if not self.event_separator:
if not self.grouping_key:
f.write(str(self.current_files[key]))
else:
json.dump({k: str(v) for k, v in self.current_files.items()}, f)
Expand All @@ -229,10 +230,10 @@ def _add_message_to_misp_event(self, message: dict):
message_obj = MessageFactory.from_dict(
message, harmonization=self.harmonization, default_type="Event"
)
if not self.event_separator:
if not self.grouping_key:
key = DEFAULT_KEY
else:
key = message_obj.get(self.event_separator) or DEFAULT_KEY
key = message_obj.get(self.grouping_key) or DEFAULT_KEY

if key in self.current_events:
event = self.current_events[key]
Expand Down Expand Up @@ -295,10 +296,11 @@ def _generate_misp_feed(self, message: dict = None):
if message:
self._add_message_to_misp_event(message)

message = self.cache_pop()
while message:
cached_msg = self.cache_rpop(self.bot_id)
while cached_msg:
message = json.loads(cached_msg)
self._add_message_to_misp_event(message)
message = self.cache_pop()
cached_msg = self.cache_rpop(self.bot_id)

for key, event in self.current_events.items():
feed_output = event.to_feed(with_meta=False)
Expand Down Expand Up @@ -340,15 +342,15 @@ def check(parameters):
)

sanity_event = Event({})
event_separator = parameters.get("event_separator")
grouping_key = parameters.get("grouping_key")
if (
event_separator and not
sanity_event._Message__is_valid_key(event_separator)[0]
grouping_key and not
sanity_event._Message__is_valid_key(grouping_key)[0]
):
results.append(
[
"error",
f"Value {event_separator} in 'event_separator' is not a valid event key.",
f"Value {grouping_key} in 'grouping_key' is not a valid event key.",
]
)

Expand Down Expand Up @@ -408,20 +410,20 @@ def check(parameters):
"error",
(
"Parameter 'tagging' has to be a dictionary with keys as '__all__' "
"or possible 'event_separator' values. Each dictionary value " +
"or possible 'grouping_key' values. Each dictionary value " +
tagging_error,
),
]
)
else:
if not event_separator and (
if not grouping_key and (
"__all__" not in tagging or len(tagging.keys()) > 1
):
results.append(
[
"error",
(
"Tagging configuration expects custom values, but the 'event_separator'"
"Tagging configuration expects custom values, but the 'grouping_key'"
" parameter is not set. If you want to just tag all events, use only"
" the '__all__' key."
),
Expand Down
22 changes: 9 additions & 13 deletions intelmq/lib/mixins/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ class CacheMixin:
cache_set

To store dict elements in a cache queue named after bot id, use methods:
cache_put
cache_pop
cache_length
cache_lpush
cache_rpop
cache_llen
"""

__redis: redis.Redis = None
Expand Down Expand Up @@ -67,19 +67,15 @@ def cache_set(self, key: str, value: Any, ttl: Optional[int] = None):
if self.redis_cache_ttl:
self.__redis.expire(key, self.redis_cache_ttl)

def cache_put(self, value: dict) -> int:
def cache_lpush(self, key: str, value: Any) -> int:
# Returns the length of the list after pushing
size = self.__redis.lpush(self.bot_id, json.dumps(value))
return size
return self.__redis.lpush(key, value)

def cache_length(self) -> int:
return self.__redis.llen(self.bot_id)
def cache_llen(self, key: str) -> int:
return self.__redis.llen(key)

def cache_pop(self) -> dict:
data = self.__redis.rpop(self.bot_id)
if data is None:
return None
return json.loads(data)
def cache_rpop(self) -> Any:
return self.__redis.rpop(self.bot_id)

def cache_flush(self):
"""
Expand Down
Loading