Skip to content

89x issue5191 #582

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iotfunctions/__init__.py
Original file line number Diff line number Diff line change
@@ -11,5 +11,5 @@
import os
import pkgutil

__version__ = '8.8.0'
__version__ = '8.9.19'
__all__ = list(module for (_, module, _) in pkgutil.iter_modules([os.path.dirname(__file__)]))
59 changes: 33 additions & 26 deletions iotfunctions/anomaly.py
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import logging
import time
import hashlib # encode feature names
import traceback

import numpy as np
import pandas as pd
@@ -97,6 +98,10 @@
Saliency_normalizer = 1
Generalized_normalizer = 1 / 300

# Do away with numba logs
numba_logger = logging.getLogger('numba')
numba_logger.setLevel(logging.ERROR)

# from
# https://stackoverflow.com/questions/44790072/sliding-window-on-time-series-data
def view_as_windows1(temperature, length, step):
@@ -261,27 +266,6 @@ def transform_spectral_residual(self, values):
return spectral_residual


def merge_score(dfEntity, dfEntityOrig, column_name, score, mindelta):
"""
Fit interpolated score to original entity slice of the full dataframe
"""

# equip score with time values, make sure it's positive
score[score < 0] = 0
dfEntity[column_name] = score

# merge
dfEntityOrig = pd.merge_asof(dfEntityOrig, dfEntity[column_name], left_index=True, right_index=True,
direction='nearest', tolerance=mindelta)

if column_name + '_y' in dfEntityOrig:
merged_score = dfEntityOrig[column_name + '_y'].to_numpy()
else:
merged_score = dfEntityOrig[column_name].to_numpy()

return merged_score


#######################################################################################
# Scalers
#######################################################################################
@@ -563,7 +547,8 @@ def prepare_data(self, dfEntity):

# interpolate gaps - data imputation
try:
dfe = dfe.dropna(subset=[self.input_item]).interpolate(method="time")
#dfe = dfe.dropna(subset=[self.input_item]).interpolate(method="time")
dfe = dfe.interpolate(method="time")
except Exception as e:
logger.error('Prepare data error: ' + str(e))

@@ -611,10 +596,15 @@ def _calc(self, df):

# remove all rows with only null entries
dfe = dfe_orig.dropna(how='all')
logger.info('Anomaly ' + str(df[self.output_items[0]].values.shape) + ', ' +
str(dfe_orig[self.output_items[0]].values.shape) + ', ' +
str(dfe[self.output_items[0]].values.shape))

# minimal time delta for merging
mindelta, dfe_orig = min_delta(dfe_orig)

logger.info('Anomaly II ' + str(dfe_orig[self.output_items[0]].values.shape))

logger.debug('Timedelta:' + str(mindelta) + ' Index: ' + str(dfe_orig.index))

# one dimensional time series - named temperature for catchyness
@@ -658,8 +648,25 @@ def _calc(self, df):
linear_interpolate = sp.interpolate.interp1d(time_series_temperature, scores[i], kind='linear',
fill_value='extrapolate')

zScoreII = merge_score(dfe, dfe_orig, output_item,
abs(linear_interpolate(np.arange(0, temperature.size, 1))), mindelta)
# stretch anomaly score to fit temperature.size
score = abs(linear_interpolate(np.arange(0, temperature.size, 1)))

# and make sure sure it's positive
score[score < 0] = 0

dfe[output_item] = score

# merge so that data is stretched to match the original data w/o gaps and NaNs
dfe_orig = pd.merge_asof(dfe_orig, dfe[output_item], left_index=True, right_index=True,
direction='nearest', tolerance=mindelta)

if output_item + '_y' in dfe_orig:
zScoreII = dfe_orig[output_item + '_y'].to_numpy()
else:
zScoreII = dfe_orig[output_item].to_numpy()

logger.debug('Merge Score : ' + str(score.shape) + ', ' + str(zScoreII.shape))

# fast path - either cut off or just copy
elif diff < 0:
zScoreII = scores[i][0:temperature.size]
@@ -669,12 +676,12 @@ def _calc(self, df):
# make sure shape is correct
try:
df[output_item] = zScoreII
except Exception as e2:
except Exception as e2:
df[output_item] = zScoreII.reshape(-1,1)
pass

except Exception as e:
logger.error(self.whoami + ' score integration failed with ' + str(e))
logger.error(self.whoami + ' score integration failed with ' + str(e) + '\n' + traceback.format_exc())

logger.debug('--->')

33 changes: 28 additions & 5 deletions iotfunctions/db.py
Original file line number Diff line number Diff line change
@@ -503,6 +503,7 @@ def __init__(self, credentials=None, start_session=False, echo=False, tenant_id=
sqlalchemy_dialect_kwargs = {}

# Establish database connection via sqlalchemy
logger.info('Establishing database connection via SqlAlchemy.')
sqlalchemy_connection_kwargs = {**sqlalchemy_dialect_kwargs, **sqlalchemy_connection_kwargs}
self.connection = create_engine(sqlalchemy_connection_string, echo=echo, **sqlalchemy_connection_kwargs)

@@ -518,21 +519,22 @@ def __init__(self, credentials=None, start_session=False, echo=False, tenant_id=
else:
self.session = None
self.metadata = MetaData(self.connection)
logger.debug('Database connection via SqlAlchemy established.')
logger.info('Database connection via SqlAlchemy established.')

# Establish native database connection (for DB2 and PostgreSQL only)
logger.info('Establishing native database connection.')
if self.db_type == 'db2':
self.native_connection = ibm_db.connect(native_connection_string, '', '')
self.native_connection = self.connect_to_db2(native_connection_string)
self.native_connection_dbi = ibm_db_dbi.Connection(self.native_connection)
logger.debug('Native database connection to DB2 established.')
logger.info('Native database connection to DB2 established.')

elif self.db_type == 'postgresql':
cred = self.credentials['postgresql']
self.native_connection = psycopg2.connect(user=cred['username'], password=cred['password'],
host=cred['host'], port=cred['port'], database=cred['db'],
application_name="AS %s Native Connection" % self.application_name)
self.native_connection_dbi = self.native_connection
logger.debug('Native database connection to PostgreSQL established.')
logger.info('Native database connection to PostgreSQL established.')
else:
self.native_connection = None
self.native_connection_dbi = None
@@ -609,6 +611,27 @@ def __init__(self, credentials=None, start_session=False, echo=False, tenant_id=
else:
logger.info(f"Data Dictionary is not available.")

def connect_to_db2(self, native_connection_string):
time_out = pd.Timestamp.utcnow() + pd.Timedelta(value=45, unit='seconds')
connection_attempt = 0
connection = None
while connection is None and time_out > pd.Timestamp.utcnow():
connection_attempt += 1
if connection_attempt > 2:
# Delay execution of each attempt as follows (attempt/seconds): 1/0, 2/0, 3/4, 4/8, 5/16, 6/32
time.sleep(2**(connection_attempt-1))
try:
connection = ibm_db.connect(native_connection_string, '', '')
except Exception:
logger.error(f"Attempt #{connection_attempt} to connect to DB2 failed.", exc_info=True)

if connection is None:
raise ConnectionError(f"DB2 connection could not be established in {connection_attempt} attempts.")
else:
logger.debug(f"DB2 connection was established at attempt #{connection_attempt}.")

return connection

def _aggregate_item(self, table, column_name, aggregate, alias_column=None, dimension_table=None,
timestamp_col=None):

@@ -931,7 +954,7 @@ def get_entity_type(self, entity_type_id):
entity = md.EntityType(name=metadata['name'], db=self,
**{'auto_create_table': False, '_timestamp': timestamp, '_db_schema': schema,
'_entity_type_id': entity_type_id, '_dimension_table_name': dim_table,
'metric_table_name': metadata['metricTableName']})
'metric_table_name': metadata['metricTableName'], '_data_items': metadata.get('dataItemDto')})

return entity

64 changes: 49 additions & 15 deletions iotfunctions/metadata.py
Original file line number Diff line number Diff line change
@@ -1201,20 +1201,50 @@ def get_custom_calendar(self):

def get_data(self, start_ts=None, end_ts=None, entities=None, columns=None):

df = self.get_data_with_col_names(start_ts=start_ts, end_ts=end_ts, entities=entities, columns=columns)

# Replace column names of data frame which are actually the DB2 column names in lower cases by the data item name
# Column names for raw metrics differ from data item name starting with Monitor 8.8 (column name = data item name + event id)
# To provide backward compatibility we have to map all data item names of raw metrics to column names when they are used as column names.
# The backward compatibility only works when the data item names are unique, i.e. a data item name must not be used in more than one event.
data_item_name_to_db_col_name = {}
db_col_name_to_data_item_name = {}
for data_item in self._data_items:
data_item_name = data_item.get('name')
if data_item.get('type') == 'METRIC' and data_item_name not in ['ENTITY_ID', 'RCV_TIMESTAMP_UTC']:
db_col_name_to_data_item_name[data_item.get('columnName').lower()] = data_item_name.lower()
if data_item.get('type') == 'METRIC':
if data_item_name_to_db_col_name.get(data_item_name) is None:
data_item_col_name = data_item.get('columnName')
data_item_name_to_db_col_name[data_item_name] = data_item_col_name
db_col_name_to_data_item_name[data_item_col_name] = data_item_name
else:
raise ValueError("Data item name %s is defined in multiple events and therefore not unique. This is currently not supported in this function." % data_item_name)

if columns is not None:
mapped_columns = [data_item_name_to_db_col_name.get(col_name, col_name) for col_name in columns]
else:
mapped_columns = None

if self._pre_agg_rules is not None:
agg_rules = {data_item_name_to_db_col_name.get(name, name): func_list for name, func_list in self._pre_agg_rules.items()}
else:
agg_rules = {}

agg_outputs = {}
if self._pre_agg_outputs is not None:
for name, output_list in self._pre_agg_outputs.items():
new_output_list = [data_item_name_to_db_col_name.get(output, output) for output in output_list]
agg_outputs[data_item_name_to_db_col_name.get(name, name)] = new_output_list

df = self.get_data_with_col_names(start_ts=start_ts, end_ts=end_ts, entities=entities, columns=mapped_columns, agg_rules=agg_rules, agg_outputs=agg_outputs)

# Column names of data frame are either column names of database table or as defined in self._pre_agg_outputs. We mapped all data item names
# to db column names self._pre_agg_outputs. Map db column names back to data item names.
df.rename(columns=db_col_name_to_data_item_name, inplace=True)

# Repeat mapping taking into account upper case to lower case conversion of sqlalchemy
tmp_mapping = {col_name.lower(): name for col_name, name in db_col_name_to_data_item_name.items()}
df.rename(columns=tmp_mapping, inplace=True)

return df

def get_data_with_col_names(self, start_ts=None, end_ts=None, entities=None, columns=None):
def get_data_with_col_names(self, start_ts=None, end_ts=None, entities=None, columns=None, agg_rules=None, agg_outputs=None):
"""
Retrieve entity data at input grain or preaggregated
"""
@@ -1250,26 +1280,26 @@ def get_data_with_col_names(self, start_ts=None, end_ts=None, entities=None, col

# make sure each column is in the aggregate dictionary
# apply a default aggregate for each column not specified in the aggregation metadata
if self._pre_agg_rules is None:
self._pre_agg_rules = {}
self._pre_agg_outputs = {}
if agg_rules is None:
agg_rules = {}
agg_outputs = {}
for c in columns:
try:
self._pre_agg_rules[c]
agg_rules[c]
except KeyError:
if c not in [self._timestamp, self._entity_id]:
if c in metrics:
self._pre_agg_rules[c] = 'mean'
self._pre_agg_outputs[c] = 'mean_%s' % c
agg_rules[c] = 'mean'
agg_outputs[c] = 'mean_%s' % c
else:
self._pre_agg_rules[c] = 'max'
self._pre_agg_outputs[c] = 'max_%s' % c
agg_rules[c] = 'max'
agg_outputs[c] = 'max_%s' % c
else:
pass

df = self.db.read_agg(table_name=self.name, schema=self._db_schema, groupby=[self._entity_id],
timestamp=self._timestamp, time_grain=self._pre_aggregate_time_grain,
agg_dict=self._pre_agg_rules, agg_outputs=self._pre_agg_outputs, start_ts=start_ts,
agg_dict=agg_rules, agg_outputs=agg_outputs, start_ts=start_ts,
end_ts=end_ts, entities=entities, dimension=self._dimension_table_name)

tw['pre-aggregeted'] = self._pre_aggregate_time_grain
@@ -1284,6 +1314,10 @@ def get_data_with_col_names(self, start_ts=None, end_ts=None, entities=None, col
memo = MemoryOptimizer()
df = memo.downcastNumeric(df)
try:
if self._entity_id in df.columns and self._df_index_entity_id not in df.columns:
df[self._df_index_entity_id] = df[self._entity_id]
if self._timestamp.lower() in df.columns:
df.rename(columns={self._timestamp.lower(): self._timestamp}, inplace=True)
df = self.index_df(df)
except (AttributeError, KeyError):
pass
8 changes: 4 additions & 4 deletions tests/test_base_functions.py
Original file line number Diff line number Diff line change
@@ -49,10 +49,10 @@ def test_base_functions():
df_i['Test2'] = df_i[Temperature] + addl
df_i['Test3'] = df_i[Temperature] + addl
df_i['Test4'] = df_i[Temperature] + addl
df_i['Test1'][3] = None
df_i['Test2'][2] = None
df_i['Test2'][3] = None
df_i['Test3'][1] = None
df_i['Test1'][3] = np.nan
df_i['Test2'][2] = np.nan
df_i['Test2'][3] = np.nan
df_i['Test3'][1] = np.nan
df_i['Test4'][1] = 10000.0
df_i['Test4'][3] = 20000.0

14 changes: 9 additions & 5 deletions tests/test_invoke_watson_studio.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# Licensed Materials - Property of IBM
# 5737-M66, 5900-AAA, 5900-A0N, 5725-S86, 5737-I75
# (C) Copyright IBM Corp. 2020, 2022 All Rights Reserved.
# US Government Users Restricted Rights - Use, duplication, or disclosure
# restricted by GSA ADP Schedule Contract with IBM Corp.
# *****************************************************************************
# © Copyright IBM Corp. 2020, 2022 All Rights Reserved.
#
# This program and the accompanying materials
# are made available under the terms of the Apache V2.0 license
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
#
# *****************************************************************************

import logging
import unittest