Home | History | Annotate | Download | only in test
      1 # Author: Paul Kippes <kippesp (at] gmail.com>
      2 
      3 import unittest
      4 import sqlite3 as sqlite
      5 
      6 class DumpTests(unittest.TestCase):
      7     def setUp(self):
      8         self.cx = sqlite.connect(":memory:")
      9         self.cu = self.cx.cursor()
     10 
     11     def tearDown(self):
     12         self.cx.close()
     13 
     14     def CheckTableDump(self):
     15         expected_sqls = [
     16                 """CREATE TABLE "index"("index" blob);"""
     17                 ,
     18                 """INSERT INTO "index" VALUES(X'01');"""
     19                 ,
     20                 """CREATE TABLE "quoted""table"("quoted""field" text);"""
     21                 ,
     22                 """INSERT INTO "quoted""table" VALUES('quoted''value');"""
     23                 ,
     24                 "CREATE TABLE t1(id integer primary key, s1 text, " \
     25                 "t1_i1 integer not null, i2 integer, unique (s1), " \
     26                 "constraint t1_idx1 unique (i2));"
     27                 ,
     28                 "INSERT INTO \"t1\" VALUES(1,'foo',10,20);"
     29                 ,
     30                 "INSERT INTO \"t1\" VALUES(2,'foo2',30,30);"
     31                 ,
     32                 "CREATE TABLE t2(id integer, t2_i1 integer, " \
     33                 "t2_i2 integer, primary key (id)," \
     34                 "foreign key(t2_i1) references t1(t1_i1));"
     35                 ,
     36                 "CREATE TRIGGER trigger_1 update of t1_i1 on t1 " \
     37                 "begin " \
     38                 "update t2 set t2_i1 = new.t1_i1 where t2_i1 = old.t1_i1; " \
     39                 "end;"
     40                 ,
     41                 "CREATE VIEW v1 as select * from t1 left join t2 " \
     42                 "using (id);"
     43                 ]
     44         [self.cu.execute(s) for s in expected_sqls]
     45         i = self.cx.iterdump()
     46         actual_sqls = [s for s in i]
     47         expected_sqls = ['BEGIN TRANSACTION;'] + expected_sqls + \
     48             ['COMMIT;']
     49         [self.assertEqual(expected_sqls[i], actual_sqls[i])
     50             for i in range(len(expected_sqls))]
     51 
     52     def CheckUnorderableRow(self):
     53         # iterdump() should be able to cope with unorderable row types (issue #15545)
     54         class UnorderableRow:
     55             def __init__(self, cursor, row):
     56                 self.row = row
     57             def __getitem__(self, index):
     58                 return self.row[index]
     59         self.cx.row_factory = UnorderableRow
     60         CREATE_ALPHA = """CREATE TABLE "alpha" ("one");"""
     61         CREATE_BETA = """CREATE TABLE "beta" ("two");"""
     62         expected = [
     63             "BEGIN TRANSACTION;",
     64             CREATE_ALPHA,
     65             CREATE_BETA,
     66             "COMMIT;"
     67             ]
     68         self.cu.execute(CREATE_BETA)
     69         self.cu.execute(CREATE_ALPHA)
     70         got = list(self.cx.iterdump())
     71         self.assertEqual(expected, got)
     72 
     73 def suite():
     74     return unittest.TestSuite(unittest.makeSuite(DumpTests, "Check"))
     75 
     76 def test():
     77     runner = unittest.TextTestRunner()
     78     runner.run(suite())
     79 
     80 if __name__ == "__main__":
     81     test()
     82