Skip to content

Commit

Permalink
Merge pull request #260 from johntruckenbrodt/feature/archive_migrate
Browse files Browse the repository at this point in the history
migration of outdated scene databases
  • Loading branch information
johntruckenbrodt authored Sep 21, 2023
2 parents 739d09e + 7bbab6f commit dbe62ec
Showing 1 changed file with 116 additions and 74 deletions.
190 changes: 116 additions & 74 deletions pyroSAR/drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from spatialist import crsConvert, sqlite3, Vector, bbox
from spatialist.ancillary import parse_literal, finder

from sqlalchemy import create_engine, Table, MetaData, Column, Integer, String, exc
from sqlalchemy import create_engine, Table, MetaData, Column, Integer, String, exc, insert
from sqlalchemy import inspect as sql_inspect
from sqlalchemy.event import listen
from sqlalchemy.orm import sessionmaker
Expand Down Expand Up @@ -2233,7 +2233,7 @@ class Archive(object):
dbfile: str
the filename for the SpatiaLite database. This might either point to an existing database or will be created otherwise.
If postgres is set to True, this will be the name for the PostgreSQL database.
custom_fields: dict
custom_fields: dict or None
a dictionary containing additional non-standard database column names and data types;
the names must be attributes of the SAR scenes to be inserted (i.e. id.attr) or keys in their meta attribute
(i.e. id.meta['attr'])
Expand All @@ -2249,7 +2249,9 @@ class Archive(object):
required for postgres driver: port number to the database. Default: 5432
cleanup: bool
check whether all registered scenes exist and remove missing entries?
legacy: bool
open an outdated database in legacy mode to import into a new database.
Opening an outdated database without legacy mode will throw a RuntimeError.
Examples
----------
Expand Down Expand Up @@ -2299,10 +2301,20 @@ class Archive(object):
>>> scenes_s1 = finder(archive_s1, [r'^S1[AB].*\.zip'], regex=True, recursive=True)
>>> with Archive(dbfile, driver='postgres', user='user', password='password', host='host', port=5432) as archive:
>>> archive.insert(scenes_s1)
Importing an old database:
>>> from pyroSAR import Archive
>>> db_new = 'scenes.db'
>>> db_old = 'scenes_old.db'
>>> with Archive(db_new) as db:
>>> with Archive(db_old, legacy=True) as db_old:
>>> db.import_outdated(db_old)
"""

def __init__(self, dbfile, custom_fields=None, postgres=False, user='postgres',
password='1234', host='localhost', port=5432, cleanup=True):
password='1234', host='localhost', port=5432, cleanup=True,
legacy=False):
# check for driver, if postgres then check if server is reachable
if not postgres:
self.driver = 'sqlite'
Expand Down Expand Up @@ -2376,53 +2388,16 @@ def __init__(self, dbfile, custom_fields=None, postgres=False, user='postgres',
self.meta = MetaData(self.engine)
self.custom_fields = custom_fields

# create tables as schema
self.data_schema = Table('data', self.meta,
Column('sensor', String),
Column('orbit', String),
Column('orbitNumber_abs', Integer),
Column('orbitNumber_rel', Integer),
Column('cycleNumber', Integer),
Column('frameNumber', Integer),
Column('acquisition_mode', String),
Column('start', String),
Column('stop', String),
Column('product', String, primary_key=True),
Column('samples', Integer),
Column('lines', Integer),
Column('outname_base', String, primary_key=True),
Column('scene', String),
Column('hh', Integer),
Column('vv', Integer),
Column('hv', Integer),
Column('vh', Integer),
Column('bbox', Geometry(geometry_type='POLYGON',
management=True, srid=4326)))

# add custom fields
if self.custom_fields is not None:
for key, val in self.custom_fields.items():
if val in ['Integer', 'integer', 'int']:
self.data_schema.append_column(Column(key, Integer))
elif val in ['String', 'string', 'str']:
self.data_schema.append_column(Column(key, String))
else:
log.info('Value in dict custom_fields must be "integer" or "string"!')
# load or create tables
self.__init_data_table()
self.__init_duplicates_table()

# schema for duplicates
self.duplicates_schema = Table('duplicates', self.meta,
Column('outname_base', String, primary_key=True),
Column('scene', String, primary_key=True))
pk = sql_inspect(self.data_schema).primary_key
if 'product' not in pk.columns.keys() and not legacy:
raise RuntimeError("the 'data' table is missing a primary key 'product'. "
"Please create a new database and import the old one "
"opened in legacy mode using Archive.import_outdated.")

# create tables if not existing
if not sql_inspect(self.engine).has_table('data'):
log.debug("creating DB table 'data'")
self.data_schema.create(self.engine)
if not sql_inspect(self.engine).has_table('duplicates'):
log.debug("creating DB table 'duplicates'")
self.duplicates_schema.create(self.engine)

# reflect tables from (by now) existing db, make some variables available within self
self.Base = automap_base(metadata=self.meta)
self.Base.prepare(self.engine, reflect=True)
self.Data = self.Base.classes.data
Expand All @@ -2446,7 +2421,7 @@ def add_tables(self, tables):
Parameters
----------
tables: :class:`sqlalchemy.schema.Table` or :obj:`list` of :class:`sqlalchemy.schema.Table`
tables: :class:`sqlalchemy.schema.Table` or list[:class:`sqlalchemy.schema.Table`]
The table(s) to be added to the database.
"""
created = []
Expand All @@ -2466,6 +2441,59 @@ def add_tables(self, tables):
self.Base = automap_base(metadata=self.meta)
self.Base.prepare(self.engine, reflect=True)

def __init_data_table(self):
if sql_inspect(self.engine).has_table('data'):
self.data_schema = Table('data', self.meta, autoload_with=self.engine)
return

log.debug("creating DB table 'data'")

self.data_schema = Table('data', self.meta,
Column('sensor', String),
Column('orbit', String),
Column('orbitNumber_abs', Integer),
Column('orbitNumber_rel', Integer),
Column('cycleNumber', Integer),
Column('frameNumber', Integer),
Column('acquisition_mode', String),
Column('start', String),
Column('stop', String),
Column('product', String, primary_key=True),
Column('samples', Integer),
Column('lines', Integer),
Column('outname_base', String, primary_key=True),
Column('scene', String),
Column('hh', Integer),
Column('vv', Integer),
Column('hv', Integer),
Column('vh', Integer),
Column('bbox', Geometry(geometry_type='POLYGON',
management=True, srid=4326)))
# add custom fields
if self.custom_fields is not None:
for key, val in self.custom_fields.items():
if val in ['Integer', 'integer', 'int']:
self.data_schema.append_column(Column(key, Integer))
elif val in ['String', 'string', 'str']:
self.data_schema.append_column(Column(key, String))
else:
log.info('Value in dict custom_fields must be "integer" or "string"!')

self.data_schema.create(self.engine)

def __init_duplicates_table(self):
# create tables if not existing
if sql_inspect(self.engine).has_table('duplicates'):
self.duplicates_schema = Table('duplicates', self.meta, autoload_with=self.engine)
return

log.debug("creating DB table 'duplicates'")

self.duplicates_schema = Table('duplicates', self.meta,
Column('outname_base', String, primary_key=True),
Column('scene', String, primary_key=True))
self.duplicates_schema.create(self.engine)

@staticmethod
def __load_spatialite(dbapi_conn, connection_record):
"""
Expand All @@ -2476,7 +2504,7 @@ def __load_spatialite(dbapi_conn, connection_record):
dbapi_conn:
db engine
connection_record:
not sure what it does but it is needed by :func:`sqlalchemy.event.listen`
not sure what it does, but it is needed by :func:`sqlalchemy.event.listen`
"""
dbapi_conn.enable_load_extension(True)
# check which platform and use according mod_spatialite
Expand Down Expand Up @@ -2540,7 +2568,7 @@ def __select_missing(self, table):
Returns
-------
list
list[str]
the names of all scenes, which are no longer stored in their registered location
"""
if table == 'data':
Expand Down Expand Up @@ -2786,7 +2814,7 @@ def get_colnames(self, table='data'):
Returns
-------
list
list[str]
the column names of the chosen table
"""
# get all columns of one table, but shows geometry columns not correctly
Expand All @@ -2807,7 +2835,7 @@ def get_tablenames(self, return_all=False):
Returns
-------
list
list[str]
the table names
"""
# TODO: make this dynamic
Expand Down Expand Up @@ -2846,26 +2874,36 @@ def get_unique_directories(self):

def import_outdated(self, dbfile):
"""
import an older data base in csv format
import an older database
Parameters
----------
dbfile: str
the file name of the old data base
dbfile: str or Archive
the old database. If this is a string, the name of a CSV file is expected.
Returns
-------
"""
with open(dbfile) as csvfile:
text = csvfile.read()
csvfile.seek(0)
dialect = csv.Sniffer().sniff(text)
reader = csv.DictReader(csvfile, dialect=dialect)
scenes = []
for row in reader:
scenes.append(row['scene'])
self.insert(scenes)
if isinstance(dbfile, str) and dbfile.endswith('csv'):
with open(dbfile) as csvfile:
text = csvfile.read()
csvfile.seek(0)
dialect = csv.Sniffer().sniff(text)
reader = csv.DictReader(csvfile, dialect=dialect)
scenes = []
for row in reader:
scenes.append(row['scene'])
self.insert(scenes)
elif isinstance(dbfile, Archive):
select = dbfile.conn.execute('SELECT * from data')
self.conn.execute(insert(self.Data).values(*select))
# duplicates in older databases may fit into the new data table
reinsert = dbfile.select_duplicates(value='scene')
if reinsert is not None:
self.insert(reinsert)
else:
raise RuntimeError("'dbfile' must either be a CSV file name or an Archive object")

def move(self, scenelist, directory, pbar=False):
"""
Expand Down Expand Up @@ -2937,7 +2975,7 @@ def select(self, vectorobject=None, mindate=None, maxdate=None, date_strict=True
Parameters
----------
vectorobject: :class:`~spatialist.vector.Vector`
vectorobject: :class:`~spatialist.vector.Vector` or None
a geometry with which the scenes need to overlap
mindate: str or datetime.datetime or None
the minimum acquisition date; strings must be in format YYYYmmddTHHMMSS; default: None
Expand All @@ -2954,7 +2992,7 @@ def select(self, vectorobject=None, mindate=None, maxdate=None, date_strict=True
the selected scenes will be filtered to those that have not yet been processed. Default: None
recursive: bool
(only if `processdir` is not None) should also the subdirectories of the `processdir` be scanned?
polarizations: list[str]
polarizations: list[str] or None
a list of polarization strings, e.g. ['HH', 'VV']
**args:
any further arguments (columns), which are registered in the database. See :meth:`~Archive.get_colnames()`
Expand Down Expand Up @@ -3024,7 +3062,11 @@ def select(self, vectorobject=None, mindate=None, maxdate=None, date_strict=True
else:
log.info('WARNING: argument vectorobject is ignored, must be of type spatialist.vector.Vector')

query = '''SELECT scene, outname_base FROM data WHERE {}'''.format(' AND '.join(arg_format))
if len(arg_format) > 0:
subquery = ' WHERE {}'.format(' AND '.join(arg_format))
else:
subquery = ''
query = '''SELECT scene, outname_base FROM data{}'''.format(subquery)
# the query gets assembled stepwise here
for val in vals:
query = query.replace('?', """'{0}'""", 1).format(val)
Expand Down Expand Up @@ -3060,7 +3102,7 @@ def select_duplicates(self, outname_base=None, scene=None, value='id'):
Returns
-------
list
list[str]
the selected scene(s)
"""
if value == 'id':
Expand Down Expand Up @@ -3101,7 +3143,7 @@ def size(self):
Returns
-------
tuple
tuple[int]
the number of scenes in (1) the main table and (2) the duplicates table
"""
# ORM query
Expand Down Expand Up @@ -3133,7 +3175,7 @@ def drop_element(self, scene, with_duplicates=False):
Parameters
----------
scene: ID
scene: str
a SAR scene
with_duplicates: bool
True: delete matching entry in duplicates table
Expand Down Expand Up @@ -3194,7 +3236,7 @@ def drop_table(self, table):
Parameters
----------
table: str
tablename
the table name
Returns
-------
Expand Down Expand Up @@ -3224,7 +3266,7 @@ def __is_open(ip, port):
----------
ip: str
ip of the server
port: str
port: str or int
port of the server
Returns
Expand Down

0 comments on commit dbe62ec

Please sign in to comment.