diff --git a/cli_client/python/timesketch_cli_client/commands/importer.py b/cli_client/python/timesketch_cli_client/commands/importer.py index 90d4cb5bce..f280cdf2c5 100644 --- a/cli_client/python/timesketch_cli_client/commands/importer.py +++ b/cli_client/python/timesketch_cli_client/commands/importer.py @@ -21,16 +21,20 @@ @click.command('import') +@click.option( + '--event-filter', default=None, + help='Optional event filter to pass to psort.') @click.option('--name', help='Name of the timeline.') @click.option( '--timeout', type=int, default=600, help='Seconds to wait for indexing.') @click.argument('file_path', type=click.Path(exists=True)) @click.pass_context -def importer(ctx, name, timeout, file_path): +def importer(ctx, event_filter, name, timeout, file_path): """Import timeline. Args: ctx: Click CLI context object. + event_filter: Event filter to pass to psort. name: Name of the timeline to create. timeout: Seconds to wait for indexing. file_path: File path to the file to import. @@ -48,7 +52,12 @@ def importer(ctx, name, timeout, file_path): # TODO: Consider using the whole command as upload context instead # of the file path. streamer.set_upload_context(file_path) + streamer.set_event_filter(event_filter) + + # Note that the event filter must be set before _upload_binary_file() + # is invoked by add_file(). streamer.add_file(file_path) + timeline = streamer.timeline if not timeline: click.echo('Error creating timeline, please try again.') diff --git a/importer_client/python/timesketch_import_client/importer.py b/importer_client/python/timesketch_import_client/importer.py index 1f2da51b1e..027a847cbf 100644 --- a/importer_client/python/timesketch_import_client/importer.py +++ b/importer_client/python/timesketch_import_client/importer.py @@ -26,8 +26,8 @@ import numpy import pandas -from timesketch_api_client import timeline from timesketch_api_client import definitions +from timesketch_api_client import timeline from timesketch_import_client import utils logger = logging.getLogger('timesketch_importer.importer') @@ -62,6 +62,7 @@ def __init__(self): self._data_lines = [] self._data_type = None self._datetime_field = None + self._event_filter = None self._format_string = None self._index = '' self._last_response = None @@ -373,6 +374,9 @@ def _upload_binary_file(self, file_path): 'provider': self._provider, 'data_label': self._data_label, } + if self._event_filter: + data['event_filter'] = self._event_filter + if self._index: data['index_name'] = self._index @@ -579,7 +583,7 @@ def add_excel_file(self, filepath, **kwargs): self.add_data_frame(data_frame) def add_file(self, filepath, delimiter=','): - """Add a CSV, JSONL or a PLASO file to the buffer. + """Add a CSV, JSONL or a Plaso storage file to the buffer. Args: filepath: the path to the file to add. @@ -614,6 +618,7 @@ def add_file(self, filepath, delimiter=','): fh, delimiter=delimiter, chunksize=self._threshold_entry): self.add_data_frame(chunk_frame, part_of_iter=True) + elif file_ending == 'plaso': self._upload_binary_file(filepath) @@ -733,6 +738,10 @@ def set_entry_threshold(self, threshold): """Set the threshold for number of entries per chunk.""" self._threshold_entry = threshold + def set_event_filter(self, event_filter): + """Set the event filter to pass to psort.""" + self._event_filter = event_filter + def set_filesize_threshold(self, threshold): """Set the threshold for file size per chunk.""" self._threshold_filesize = threshold @@ -807,13 +816,12 @@ def timeline(self): logger.warning('No timeline ID has been stored as of yet.') return None - timeline_obj = timeline.Timeline( + return timeline.Timeline( timeline_id=self._timeline_id, sketch_id=self._sketch.id, api=self._sketch.api, name=self._timeline_name, searchindex=self._index) - return timeline_obj def __enter__(self): """Make it possible to use "with" statement.""" diff --git a/timesketch/api/v1/resources/upload.py b/timesketch/api/v1/resources/upload.py index 47b3a2b38d..c587ac236b 100644 --- a/timesketch/api/v1/resources/upload.py +++ b/timesketch/api/v1/resources/upload.py @@ -227,16 +227,19 @@ def _upload_and_index( db_session.add(timeline) db_session.commit() + event_filter = form.get('event_filter', None) + sketch_id = sketch.id # Start Celery pipeline for indexing and analysis. # Import here to avoid circular imports. # pylint: disable=import-outside-toplevel from timesketch.lib import tasks pipeline = tasks.build_index_pipeline( - file_path=file_path, events=events, timeline_name=timeline_name, - index_name=searchindex.index_name, file_extension=file_extension, - sketch_id=sketch_id, only_index=enable_stream, - timeline_id=timeline.id) + event_filter=event_filter, events=events, + file_extension=file_extension, file_path=file_path, + index_name=searchindex.index_name, only_index=enable_stream, + sketch_id=sketch_id, timeline_id=timeline.id, + timeline_name=timeline_name) task_id = uuid.uuid4().hex pipeline.apply_async(task_id=task_id) diff --git a/timesketch/lib/tasks.py b/timesketch/lib/tasks.py index 75470991b3..dafee75203 100644 --- a/timesketch/lib/tasks.py +++ b/timesketch/lib/tasks.py @@ -201,61 +201,53 @@ def _set_timeline_status(timeline_id, status, error_msg=None): db_session.commit() -def _get_index_task_class(file_extension): - """Get correct index task function for the supplied file type. - - Args: - file_extension (str): File type based on filename extension. - - Returns: - A task function. - - Raises: - KeyError if no task class can be found. - """ - if file_extension == 'plaso': - index_class = run_plaso - elif file_extension in ['csv', 'jsonl']: - index_class = run_csv_jsonl - else: - raise KeyError('No task that supports {0:s}'.format(file_extension)) - return index_class - - def build_index_pipeline( - file_path='', events='', timeline_name='', index_name='', - file_extension='', sketch_id=None, only_index=False, timeline_id=None): + event_filter=None, events='', file_extension='', file_path='', + index_name='', only_index=False, sketch_id=None, timeline_id=None, + timeline_name=''): """Build a pipeline for index and analysis. Args: - file_path: The full path to a file to upload, either a file_path or - or events need to be defined. + event_filter: Event filter to pass to psort. events: String with the event data, either file_path or events needs to be defined. - timeline_name: Name of the timeline to create. - index_name: Name of the index to index to. file_extension: The file extension of the file. - sketch_id: The ID of the sketch to analyze. + file_path: The full path to a file to upload, either a file_path or + or events need to be defined. + index_name: Name of the index to index to. only_index: If set to true then only indexing tasks are run, not analyzers. This is to be used when uploading data in chunks, we don't want to run the analyzers until all chunks have been uploaded. + sketch_id: The ID of the sketch to analyze. timeline_id: Optional ID of the timeline object this data belongs to. + timeline_name: Name of the timeline to create. Returns: Celery chain with indexing task (or single indexing task) and analyzer task group. + + Raises: + RuntimeError: if no file path or events were specified. """ if not (file_path or events): raise RuntimeError( 'Unable to upload data, missing either a file or events.') - index_task_class = _get_index_task_class(file_extension) + + if file_extension not in ('csv', 'jsonl', 'plaso'): + raise KeyError('No task that supports {0:s}'.format(file_extension)) + sketch_analyzer_chain = None searchindex = SearchIndex.query.filter_by(index_name=index_name).first() - index_task = index_task_class.s( - file_path, events, timeline_name, index_name, file_extension, - timeline_id) + if file_extension in ('csv', 'jsonl'): + index_task = run_csv_jsonl.s( + file_path, events, timeline_name, index_name, file_extension, + timeline_id) + else: + index_task = run_plaso.s( + file_path, event_filter, timeline_name, index_name, file_extension, + timeline_id) # TODO: Check if a scenario is set or an investigative question # is in the sketch, and then enable data finder on the newly @@ -496,38 +488,36 @@ def run_sketch_analyzer( @celery.task(track_started=True, base=SqlAlchemyTask) def run_plaso( - file_path, events, timeline_name, index_name, source_type, timeline_id): + file_path, event_filter, timeline_name, index_name, source_type, + timeline_id): """Create a Celery task for processing Plaso storage file. Args: file_path: Path to the plaso file on disk. - events: String with event data, invalid for plaso files. + event_filter: Event filter to pass to psort. timeline_name: Name of the Timesketch timeline. index_name: Name of the datastore index. source_type: Type of file, csv or jsonl. timeline_id: ID of the timeline object this data belongs to. - Raises: - RuntimeError: If the function is called using events, plaso - is not installed or is of unsupported version. Returns: Name (str) of the index. + + Raises: + RuntimeError: If Plaso is not installed or is of unsupported version. """ if not plaso: raise RuntimeError( 'Plaso isn\'t installed, unable to continue processing plaso ' 'files.') - plaso_version = int(plaso.__version__) + plaso_version = int(plaso.__version__, 10) if plaso_version <= PLASO_MINIMUM_VERSION: raise RuntimeError( 'Plaso version is out of date (version {0:d}, please upgrade to a ' 'version that is later than {1:d}'.format( plaso_version, PLASO_MINIMUM_VERSION)) - if events: - raise RuntimeError('Plaso uploads needs a file, not events.') - event_type = 'generic_event' # Document type for Elasticsearch mappings = None @@ -593,10 +583,9 @@ def run_plaso( psort_path = 'psort.py' cmd = [ - psort_path, '-o', 'elastic_ts', file_path, '--server', elastic_server, + psort_path, '-o', 'elastic_ts', '--server', elastic_server, '--port', str(elastic_port), '--status_view', 'none', - '--index_name', index_name, - ] + '--index_name', index_name] if mappings_file_path: cmd.extend(['--elastic_mappings', mappings_file_path]) @@ -620,6 +609,11 @@ def run_plaso( if psort_memory: cmd.extend(['--process_memory_limit', str(psort_memory)]) + cmd.append(file_path) + + if event_filter: + cmd.append(event_filter) + # Run psort.py try: subprocess.check_output(