Skip to content

Commit

Permalink
- [bug] bulk_insert() fixes:
Browse files Browse the repository at this point in the history
    1. bulk_insert() operation was
       not working most likely since the 0.2 series
       when used with an engine. #41
    2. Repaired bulk_insert() to complete when
       used against a lower-case-t table and executing
       with only one set of parameters, working
       around SQLAlchemy bug #2461 in this regard.
    3. bulk_insert() uses "inline=True" so that phrases
       like RETURNING and such don't get invoked for
       single-row bulk inserts.
    4. bulk_insert() will check that you're passing
       a list of dictionaries in, raises TypeError
       if not detected.
  • Loading branch information
zzzeek committed Apr 7, 2012
1 parent 456d364 commit a7ada78
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 9 deletions.
18 changes: 18 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
0.3.1
=====
- [bug] bulk_insert() fixes:

1. bulk_insert() operation was
not working most likely since the 0.2 series
when used with an engine. #41
2. Repaired bulk_insert() to complete when
used against a lower-case-t table and executing
with only one set of parameters, working
around SQLAlchemy bug #2461 in this regard.
3. bulk_insert() uses "inline=True" so that phrases
like RETURNING and such don't get invoked for
single-row bulk inserts.
4. bulk_insert() will check that you're passing
a list of dictionaries in, raises TypeError
if not detected.

0.3.0
=====
- [general] The focus of 0.3 is to clean up
Expand Down
2 changes: 1 addition & 1 deletion alembic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from os import path

__version__ = '0.3.0'
__version__ = '0.3.1'

package_dir = path.abspath(path.dirname(__file__))

Expand Down
11 changes: 9 additions & 2 deletions alembic/ddl/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,21 @@ def drop_index(self, index):
self._exec(schema.DropIndex(index))

def bulk_insert(self, table, rows):
if not isinstance(rows, list):
raise TypeError("List expected")
elif rows and not isinstance(rows[0], dict):
raise TypeError("List of dictionaries expected")
if self.as_sql:
for row in rows:
self._exec(table.insert().values(**dict(
self._exec(table.insert(inline=True).values(**dict(
(k, _literal_bindparam(k, v, type_=table.c[k].type))
for k, v in row.items()
)))
else:
self._exec(table.insert(), *rows)
# work around http://www.sqlalchemy.org/trac/ticket/2461
if not hasattr(table, '_autoincrement_column'):
table._autoincrement_column = None
self._exec(table.insert(inline=True), multiparams=rows)

def compare_type(self, inspector_column, metadata_column):

Expand Down
2 changes: 2 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def _get_dialect(name):
except KeyError:
dialect_mod = getattr(__import__('sqlalchemy.dialects.%s' % name).dialects, name)
_dialects[name] = d = dialect_mod.dialect()
if name == 'postgresql':
d.implicit_returning = True
return d

def assert_compiled(element, assert_string, dialect=None):
Expand Down
123 changes: 117 additions & 6 deletions tests/test_bulk_insert.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
from tests import op_fixture
from tests import op_fixture, _sqlite_testing_config, eq_, assert_raises_message
from alembic import op
from sqlalchemy import Integer, \
UniqueConstraint, String
from sqlalchemy.sql import table, column
from unittest import TestCase
from sqlalchemy import Table, Column, MetaData

def _test_bulk_insert(dialect, as_sql):
def _table_fixture(dialect, as_sql):
context = op_fixture(dialect, as_sql)
t1 = table("ins_table",
column('id', Integer),
column('v1', String()),
column('v2', String()),
)
return context, t1

def _big_t_table_fixture(dialect, as_sql):
context = op_fixture(dialect, as_sql)
t1 = Table("ins_table", MetaData(),
Column('id', Integer, primary_key=True),
Column('v1', String()),
Column('v2', String()),
)
return context, t1

def _test_bulk_insert(dialect, as_sql):
context, t1 = _table_fixture(dialect, as_sql)

op.bulk_insert(t1, [
{'id':1, 'v1':'row v1', 'v2':'row v5'},
{'id':2, 'v1':'row v2', 'v2':'row v6'},
Expand All @@ -19,6 +35,22 @@ def _test_bulk_insert(dialect, as_sql):
])
return context

def _test_bulk_insert_single(dialect, as_sql):
context, t1 = _table_fixture(dialect, as_sql)

op.bulk_insert(t1, [
{'id':1, 'v1':'row v1', 'v2':'row v5'},
])
return context

def _test_bulk_insert_single_bigt(dialect, as_sql):
context, t1 = _big_t_table_fixture(dialect, as_sql)

op.bulk_insert(t1, [
{'id':1, 'v1':'row v1', 'v2':'row v5'},
])
return context

def test_bulk_insert():
context = _test_bulk_insert('default', False)
context.assert_(
Expand All @@ -35,10 +67,6 @@ def test_bulk_insert_wrong_cols():
op.bulk_insert(t1, [
{'v1':'row v1', },
])
# TODO: this is wrong because the test fixture isn't actually
# doing what the real context would do. Sending this to
# PG is going to produce a RETURNING clause. fixture would
# need to be beefed up
context.assert_(
'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
)
Expand All @@ -49,6 +77,24 @@ def test_bulk_insert_pg():
'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
)

def test_bulk_insert_pg_single():
context = _test_bulk_insert_single('postgresql', False)
context.assert_(
'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
)

def test_bulk_insert_pg_single_as_sql():
context = _test_bulk_insert_single('postgresql', True)
context.assert_(
"INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')"
)

def test_bulk_insert_pg_single_big_t_as_sql():
context = _test_bulk_insert_single_bigt('postgresql', True)
context.assert_(
"INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')"
)

def test_bulk_insert_mssql():
context = _test_bulk_insert('mssql', False)
context.assert_(
Expand Down Expand Up @@ -86,3 +132,68 @@ def test_bulk_insert_as_sql_mssql():
"INSERT INTO ins_table (id, v1, v2) VALUES (4, 'row v4', 'row v8')",
'SET IDENTITY_INSERT ins_table OFF'
)

def test_invalid_format():
context, t1 = _table_fixture("sqlite", False)
assert_raises_message(
TypeError,
"List expected",
op.bulk_insert, t1, {"id":5}
)

assert_raises_message(
TypeError,
"List of dictionaries expected",
op.bulk_insert, t1, [(5, )]
)

class RoundTripTest(TestCase):
def setUp(self):
from sqlalchemy import create_engine
from alembic.migration import MigrationContext
self.conn = create_engine("sqlite://").connect()
self.conn.execute("""
create table foo(
id integer primary key,
data varchar(50),
x integer
)
""")
context = MigrationContext.configure(self.conn)
self.op = op.Operations(context)
self.t1 = table('foo',
column('id'),
column('data'),
column('x')
)
def tearDown(self):
self.conn.close()

def test_single_insert_round_trip(self):
self.op.bulk_insert(self.t1,
[{'data':"d1", "x":"x1"}]
)

eq_(
self.conn.execute("select id, data, x from foo").fetchall(),
[
(1, "d1", "x1"),
]
)

def test_bulk_insert_round_trip(self):
self.op.bulk_insert(self.t1, [
{'data':"d1", "x":"x1"},
{'data':"d2", "x":"x2"},
{'data':"d3", "x":"x3"},
])

eq_(
self.conn.execute("select id, data, x from foo").fetchall(),
[
(1, "d1", "x1"),
(2, "d2", "x2"),
(3, "d3", "x3")
]
)

0 comments on commit a7ada78

Please sign in to comment.