diff --git a/intelmq_webinput_csv/bin/backend.py b/intelmq_webinput_csv/bin/backend.py index 1be85a8b..415a8793 100644 --- a/intelmq_webinput_csv/bin/backend.py +++ b/intelmq_webinput_csv/bin/backend.py @@ -9,6 +9,11 @@ import logging import os +import functools +import concurrent.futures + +from typing import Union, Tuple + import dateutil.parser from flask import Flask, jsonify, make_response, request @@ -18,7 +23,7 @@ from intelmq.lib.message import Event, MessageFactory from intelmq.lib.pipeline import PipelineFactory from intelmq.lib.exceptions import InvalidValue, KeyExists -from intelmq.lib.utils import RewindableFileHandle +from intelmq.lib.utils import RewindableFileHandle, load_configuration from intelmq_webinput_csv.version import __version__ @@ -195,6 +200,68 @@ def handle_extra(value: str) -> dict: value = {'data': value} return value +def serialize_event(data: Tuple[dict, str], harmonization: dict, parameters: dict, + raw_header: list, time_observation: str) -> Union[Tuple[str, str], Tuple[None, None]]: + """ + Serialize an CSV line into IntelMQ raw message + + Parameters: + data: tuple of CSV line (parsed, raw) + harmonization: dict of Harmonization config + parameters: dict of form parameters + raw_header: list of raw header + time_observation: str of time of observation + + Returns: + (queue, raw_message) or (None, None) if error occured + """ + queue = "" + (line, raw) = data + event = Event(harmonization=harmonization) + + try: + for columnindex, (column, value) in \ + enumerate(zip(parameters['columns'], line)): + if not column or not value: + continue + if column.startswith('time.'): + parsed = dateutil.parser.parse(value, fuzzy=True) + if not parsed.tzinfo: + value += parameters['timezone'] + parsed = dateutil.parser.parse(value) + value = parsed.isoformat() + if column == 'extra': + value = handle_extra(value) + event.add(column, value) + for key, value in parameters.get('constant_fields', {}).items(): + if key not in event: + event.add(key, value) + for key, value in request.form.items(): + if not key.startswith('custom_'): + continue + key = key[7:] + if key not in event: + event.add(key, value) + if CONFIG.get('destination_pipeline_queue_formatted', False): + queue = CONFIG['destination_pipeline_queue'].format(ev=event) + except Exception: + app.logger.exception('Failure') + return (None, None) + + if 'classification.type' not in event: + event.add('classification.type', parameters['classification.type']) + if 'classification.identifier' not in event: + event.add('classification.identifier', parameters['classification.identifier']) + if 'feed.code' not in event: + event.add('feed.code', parameters['feed.code']) + if 'time.observation' not in event: + event.add('time.observation', time_observation, sanitize=False) + + if 'raw' not in event: + event.add('raw', ''.join(raw_header + [raw])) + + return (queue, MessageFactory.serialize(event)) + @app.route('/') def form(): @@ -404,6 +471,10 @@ def submit(): successful_lines = 0 raw_header = [] + + # Ensure Harmonization config is only loaded once + harmonization = load_configuration(HARMONIZATION_CONF_FILE) + with open(tmp_file[0], encoding='utf8') as handle: handle_rewindable = RewindableFileHandle(handle) reader = csv.reader(handle_rewindable, delimiter=parameters['delimiter'], @@ -416,51 +487,31 @@ def submit(): raw_header.append(handle_rewindable.current_line) for _ in range(parameters['skipInitialLines']): next(reader) - for lineindex, line in enumerate(reader): - event = Event() - try: - for columnindex, (column, value) in \ - enumerate(zip(parameters['columns'], line)): - if not column or not value: - continue - if column.startswith('time.'): - parsed = dateutil.parser.parse(value, fuzzy=True) - if not parsed.tzinfo: - value += parameters['timezone'] - parsed = dateutil.parser.parse(value) - value = parsed.isoformat() - if column == 'extra': - value = handle_extra(value) - event.add(column, value) - for key, value in parameters.get('constant_fields', {}).items(): - if key not in event: - event.add(key, value) - for key, value in request.form.items(): - if not key.startswith('custom_'): - continue - key = key[7:] - if key not in event: - event.add(key, value) - if CONFIG.get('destination_pipeline_queue_formatted', False): - queue_name = CONFIG['destination_pipeline_queue'].format(ev=event) - destination_pipeline.set_queues(queue_name, "destination") - destination_pipeline.connect() - except Exception: - app.logger.exception('Failure') - continue - if 'classification.type' not in event: - event.add('classification.type', parameters['classification.type']) - if 'classification.identifier' not in event: - event.add('classification.identifier', parameters['classification.identifier']) - if 'feed.code' not in event: - event.add('feed.code', parameters['feed.code']) - if 'time.observation' not in event: - event.add('time.observation', time_observation, sanitize=False) - if 'raw' not in event: - event.add('raw', ''.join(raw_header + [handle_rewindable.current_line])) - raw_message = MessageFactory.serialize(event) - destination_pipeline.send(raw_message) - successful_lines += 1 + + # Generator func for retrieving parsed & raw line in single tuple + generator = ((entry, handle_rewindable.current_line) for entry in reader) + + # Parallelize serialization over all available cores + with concurrent.futures.ProcessPoolExecutor() as executor: + part_serialize_event = functools.partial(serialize_event, harmonization=harmonization, + parameters=parameters, raw_header=raw_header, + time_observation=time_observation) + + future = executor.map(part_serialize_event, generator, chunksize=1_000) + + # Loop through results + for (queue_name, raw_message) in future: + + # If queue_name specified, alternate pipeline should be used + if queue_name: + destination_pipeline.set_queues(queue_name, "destination") + destination_pipeline.connect() + + # If no error occured + if raw_message: + destination_pipeline.send(raw_message) + successful_lines += 1 + return create_response('Successfully processed %s lines.' % successful_lines)