make sqlite tz datetime aware

This patch relies on python-dateutil to parse timestamp string on for a simple tzinfo implementation (we only need utc).

Closes #2777713

authorSylvain Thénault <sylvain.thenault@logilab.fr>
changesetcb3ff5d1c2e3
branchdefault
phasedraft
hiddenyes
parent revision#aabdf2b17fe0 [sqlite] don't call abspath is :memory: is given
child revision<not specified>
files modified by this revision
__pkginfo__.py
logilab/database/sqlite.py
test/unittest_sqlite.py
# HG changeset patch
# User Sylvain Thénault <sylvain.thenault@logilab.fr>
# Date 1448362049 -3600
# Tue Nov 24 11:47:29 2015 +0100
# Node ID cb3ff5d1c2e3a496a6bf6077c79eb31ad97e1f9d
# Parent aabdf2b17fe01d68dea2de1d529c0ce1f4931f76
make sqlite tz datetime aware

This patch relies on python-dateutil to parse timestamp string on for a simple
tzinfo implementation (we only need utc).

Closes #2777713

diff --git a/__pkginfo__.py b/__pkginfo__.py
@@ -38,10 +38,11 @@
1  install_requires = [
2      'setuptools',
3      'logilab-common >= 0.63.2',
4      'six >= 1.4.0',
5      'Yapps2',
6 +    'python-dateutil',
7      ]
8 
9  tests_require = [
10      'psycopg2',
11      ]
diff --git a/logilab/database/sqlite.py b/logilab/database/sqlite.py
@@ -21,19 +21,20 @@
12  """
13  __docformat__ = "restructuredtext en"
14 
15  from warnings import warn
16  from os.path import abspath
17 -import os
18  import re
19  import inspect
20 
21  from six import PY2, text_type
22 +from dateutil import tz, parser
23 
24  from logilab.common.date import strptime
25  from logilab import database as db
26 
27 +
28  class _Sqlite3Adapter(db.DBAPIAdapter):
29      # no type code in sqlite3
30      BINARY = 'XXX'
31      STRING = 'XXX'
32      DATETIME = 'XXX'
@@ -148,10 +149,17 @@
33                                   hours*3600 + minutes*60 + seconds,
34                                   microseconds)
35 
36              sqlite.register_converter('interval', convert_timedelta)
37 
38 +            def convert_tzdatetime(data):
39 +                dt = parser.parse(data)
40 +                if dt.tzinfo is None:
41 +                    dt = dt.replace(tzinfo=tz.tzutc())
42 +                return dt
43 +            sqlite.register_converter('tzdatetime', convert_tzdatetime)
44 +
45 
46      def connect(self, host='', database='', user='', password='', port=None,
47                  schema=None, extra_args=None):
48          """Handles sqlite connection format"""
49          sqlite = self._native_module
@@ -232,10 +240,16 @@
50      intersect_all_support = False
51      alter_column_support = False
52 
53      TYPE_CONVERTERS = db._GenericAdvFuncHelper.TYPE_CONVERTERS.copy()
54 
55 +    TYPE_MAPPING = db._GenericAdvFuncHelper.TYPE_MAPPING.copy()
56 +    TYPE_MAPPING.update({
57 +        'TZTime': 'tztime',
58 +        'TZDatetime': 'tzdatetime',
59 +    })
60 +
61      def backup_commands(self, backupfile, keepownership=True,
62                          dbname=None, dbhost=None, dbport=None, dbuser=None, dbschema=None):
63          dbname = dbname or self.dbname
64          return ['gzip -c %s > %s' % (dbname, backupfile)]
65 
diff --git a/test/unittest_sqlite.py b/test/unittest_sqlite.py
@@ -1,6 +1,6 @@
66 -# copyright 2003-2011 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
67 +# copyright 2003-2015 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
68  # contact http://www.logilab.fr/ -- mailto:contact@logilab.fr
69  #
70  # This file is part of logilab-database.
71  #
72  # logilab-database is free software: you can redistribute it and/or modify it
@@ -14,34 +14,54 @@
73  # for more details.
74  #
75  # You should have received a copy of the GNU Lesser General Public License along
76  # with logilab-database. If not, see <http://www.gnu.org/licenses/>.
77  import unittest
78 +import sqlite3
79 +from datetime import datetime
80 +
81 +from dateutil.tz import tzutc
82 
83  from logilab.common.testlib import MockConnection
84 
85 -from logilab.database import get_db_helper
86 +from logilab.database import sqlite as lgdbsqlite
87 +from logilab.database import get_connection, get_db_helper
88 
89 
90  class SQLiteHelperTC(unittest.TestCase):
91 
92      def setUp(self):
93          self.cnx = MockConnection( () )
94          self.helper = get_db_helper('sqlite')
95 
96      def test_type_map(self):
97 +        self.assertEqual(self.helper.TYPE_MAPPING['TZDatetime'], 'tzdatetime')
98          self.assertEqual(self.helper.TYPE_MAPPING['Datetime'], 'timestamp')
99          self.assertEqual(self.helper.TYPE_MAPPING['String'], 'text')
100          self.assertEqual(self.helper.TYPE_MAPPING['Password'], 'bytea')
101          self.assertEqual(self.helper.TYPE_MAPPING['Bytes'], 'bytea')
102 
103 +
104  class SQLiteAdapterTC(unittest.TestCase):
105 
106      def test_only_one_lazy_module_initialization(self):
107 -        import sqlite3
108 -        from logilab.database import sqlite as lgdbsqlite
109          self.assertFalse(lgdbsqlite._Sqlite3Adapter._module_is_initialized)
110          adapter = lgdbsqlite._Sqlite3Adapter(sqlite3)
111          self.assertTrue(adapter._module_is_initialized)
112 
113 +    def test_tzsupport(self):
114 +        cnx = get_connection(database=':memory:', driver='sqlite')
115 +        cu = cnx.cursor()
116 +        cu.execute('CREATE TABLE tztest(tzt tzdatetime)')
117 +        now = datetime.now(tzutc())
118 +        cu.execute('INSERT INTO tztest VALUES (%(tzt)s)', {'tzt': now})
119 +        cu.execute('SELECT * FROM tztest')
120 +        dbnow = cu.fetchone()[0]
121 +        self.assertEqual(dbnow, now)
122 +
123 +        cu.execute('UPDATE tztest SET tzt=(%(tzt)s)', {'tzt': datetime.utcnow()})
124 +        cu.execute('SELECT * FROM tztest')
125 +        dbnow = cu.fetchone()[0]
126 +        self.assertEqual(dbnow.tzinfo, tzutc())
127 +
128  if __name__ == '__main__':
129      unittest.main()