libkosokoso/tests/test_basic.py

430 lines
12 KiB
Python

# vim: set fileencoding=UTF-8
from __future__ import print_function
import mock
import pandas as pd
import pprint
import sqlalchemy as sa
import sqlalchemy.orm as orm
import sqlite3
import textwrap
import unittest
from sqlalchemy import text
import libkosokoso as kk
class DummyBase(sa.orm.DeclarativeBase):
pass
# This is because presumably you have a declarative_base that you're
# already using, and obviously sqalchemy only allows relationships
# between objects with the same base. Hence, init_base dynamically
# generates classes derived from the base.
kk.__dict__.update(kk.init_base(DummyBase))
# TODO make it so DummyBase doesn't have to come after the mixin.
# Actually, is DummyBase needed at all?
class Foo(kk.Taggable, DummyBase):
__tablename__ = 'foos'
db_id = sa.Column(sa.Integer, primary_key=True)
def __repr__(self):
return "foo: id %s" % self.db_id
def __eq__(self, other):
return self.db_id == other.db_id
class Bar(kk.Taggable, DummyBase):
__tablename__ = 'bars'
db_id = sa.Column(sa.Integer, primary_key=True)
def __repr__(self):
return "bar: id %s" % self.db_id
class ks_basic(unittest.TestCase):
def setUp(self):
# self.engine = sa.create_engine('sqlite://', echo=True)
self.engine = sa.create_engine('sqlite://', echo=False)
DummyBase.metadata.create_all(self.engine)
self.session = sa.orm.Session(self.engine)
def tearDown(self):
del self.engine
# def debug(self):
# pprint.pprint(kk.__dict__)
def test_add_as_object(self):
a = Foo()
self.session.add(a)
self.session.commit()
t = kk.Tag()
t.text = u'test_tag'
t2 = kk.Tag('test2')
self.assertEqual('test2', t2.text)
self.session.add(t)
self.session.add(t2)
self.session.commit()
self.assertEqual(u'test_tag', t.text)
self.assertEqual('test2', t2.text)
self.assertEqual([], a.tags)
a.tags.append(t)
a.tags.append(t2)
self.session.add(a)
self.session.commit()
a_id = a.db_id
t_id = t.db_id
t2_id = t2.db_id
del a, t, t2
a = self.session.query(Foo).get(a_id)
t = self.session.query(kk.Tag).get(t_id)
t2 = self.session.query(kk.Tag).get(t2_id)
self.assertEqual(['test_tag', 'test2'], a.tags)
self.assertEqual(u'test_tag', t.text)
self.assertEqual(u'test2', t2.text)
def test_unique(self):
t1 = kk.Tag('ttt')
t2 = kk.Tag('ttt')
self.session.add(t1)
self.session.add(t2)
self.session.commit()
del t1, t2
ts = self.session.query(kk.Tag).all()
self.assertEqual(1, len(ts))
def test_collection(self):
"""Test access to the collection of objects associated with a
tag."""
f1 = Foo()
f2 = Foo()
b1 = Bar()
t = kk.Tag('tag1')
self.session.add(f1)
self.session.add(f2)
self.session.add(b1)
self.session.add(t)
self.session.commit()
del f1, f2, b1, t
f1 = self.session.query(Foo).get(1)
f2 = self.session.query(Foo).get(2)
b1 = self.session.query(Bar).get(1)
t = self.session.query(kk.Tag).get(1)
t2 = kk.Tag('tag2')
t3 = kk.Tag('tag3')
self.session.add(t2)
self.session.add(t3)
f1.tags.append(t)
f2.tags.append(t)
b1.tags.append(t)
f2.tags.append(t2)
b1.tags.append(t3)
b1.tags.append(t2)
self.session.commit()
# print(pd.read_sql_query("SELECT * FROM kk_tag_associations",
# self.engine))
# print("collection:")
# for i in t.collection:
# print(i)
# print("collection:")
# for i in t.collection:
# print(i)
self.assertEqual(3, len(t.collection))
self.assertEqual(2, len(t2.collection))
self.assertEqual(1, len(t3.collection))
l = list(t.collection)
# TODO do we need to test these?
ta1 = self.session.query(kk.TagAssociation).get(1)
ta2 = self.session.query(kk.TagAssociation).get(2)
self.assertEqual(l[0].db_id, ta1.target_id)
self.assertNotEqual(l[1].db_id, ta1.target_id)
self.assertEqual(l[1].db_id, ta2.target_id)
self.assertTrue(isinstance(l[0], Foo))
self.assertTrue(isinstance(l[1], Foo))
self.assertTrue(isinstance(l[2], Bar))
l2 = list(t2.collection)
self.assertTrue(isinstance(l2[0], Foo))
self.assertTrue(isinstance(l2[1], Bar))
l3 = list(t3.collection)
self.assertTrue(isinstance(l3[0], Bar))
def test_writable_collection(self):
"""Test adding objects to a tag's collection member"""
f1 = Foo()
f2 = Foo()
b1 = Bar()
t = kk.Tag('tag1')
self.session.add(f1)
self.session.add(f2)
self.session.add(b1)
self.session.add(t)
self.session.commit()
# t.collection.append(f1)
t.collection.append(f1)
t.collection.append(f2)
t.collection.append(b1)
self.session.commit()
# print(pd.read_sql_query("SELECT * FROM kk_tag_associations",
# self.engine))
# print("collection:")
# for i in t.collection:
# print(i)
self.assertEqual(3, len(t.collection))
l = list(t.collection)
self.assertTrue(isinstance(l[0], Foo))
self.assertEqual(l[0], f1)
self.assertTrue(isinstance(l[2], Bar))
# @unittest.skip
def test_merge_collection(self):
f1 = Foo()
f2 = Foo()
t1 = kk.Tag('tag1')
t2 = kk.Tag('tag2')
# all this stuff has to be in the database to get an id.
# FIXME make tagging in general work with transient objects.
self.session.add(f1)
self.session.add(f2)
self.session.add(t1)
self.session.add(t2)
self.session.commit()
t1.collection.append(f1)
t1.collection.append(f2)
t2.collection.append(f2)
t2.collection.merge(t1.collection)
c1 = sorted(t1.collection)
c2 = sorted(t2.collection)
self.assertEqual(len(t1.collection), len(t2.collection))
# FIXME doesn't work because collection equality includes
# equality of tag names, which of course doesn't exist.
# self.assertEqual(t1.collection, t2.collection)
def test_association(self):
a = Foo()
b = Foo()
c = Bar()
t1 = kk.Tag("bleh")
t2 = kk.Tag("blah")
for i in [a,b]:
i.tags.append(t1)
for i in [b,c]:
i.tags.append(t2)
for i in [a,b,c,t1,t2]:
self.session.add(i)
self.session.commit()
l = a.kk_tag_associations[0]
self.assertEqual(l.tag_id, t1.db_id)
self.assertEqual(l.target_table, "foos")
self.assertEqual(l.target_id, a.db_id)
def test_addstring(self):
a = Foo()
a.tags.append('testtag')
self.session.add(a)
self.session.commit()
self.assertEqual(['testtag'], a.tags)
t = self.session.query(kk.Tag).get(1)
self.assertEqual(type(a), type(list(t.collection)[0]))
self.assertEqual(a.db_id, list(t.collection)[0].db_id)
def test_multibyte(self):
a = Foo()
a.tags.append(u'地域')
self.session.add(a)
self.session.commit()
self.assertEqual([u'地域'], a.tags)
def test_multiple(self):
a = Foo()
a.tags.append('testtag')
self.session.add(a)
b = Bar()
b.tags.append('tagtest')
self.session.add(b)
# self.session.commit()
c = Foo()
c.tags.append('thirdtag')
self.session.add(c)
self.session.commit()
self.assertEqual(['testtag'], a.tags)
self.assertEqual(['tagtest'], b.tags)
self.assertEqual(['thirdtag'], c.tags)
a_id = a.db_id
b_id = b.db_id
c_id = c.db_id
del a, b, c
a = self.session.query(Foo).get(a_id)
b = self.session.query(Bar).get(b_id)
c = self.session.query(Foo).get(c_id)
self.assertEqual(['testtag'], a.tags)
self.assertEqual(['tagtest'], b.tags)
self.assertEqual(['thirdtag'], c.tags)
b.tags.append('testtag')
# TODO figure out autocommit behavior.
# self.session.commit()
del b
b = self.session.query(Bar).get(b_id)
self.assertEqual(['tagtest', 'testtag'], b.tags)
# print(pd.read_sql_query("SELECT * FROM kk_tag_associations",
# self.engine))
# print(pd.read_sql_query("SELECT * FROM kk_tags",
# self.engine))
def test_list(self):
a = Foo()
l = ['t1', 't2', 't3']
a.tags = l
self.assertEqual(l, a.tags)
self.session.add(a)
self.session.commit()
a_id = a.db_id
a = self.session.query(Foo).get(a_id)
self.assertEqual(l, a.tags)
l2 = ['t4', 't5']
l.extend(l2)
a.tags.extend(l2)
self.assertEqual(l, a.tags)
# @unittest.skip
def test_pretty(self):
"""verify that the example in the readme works."""
a = Foo()
a.tags.append('tag1')
a.tags.extend(['tag2', 'tag3'])
self.session.add(a)
self.session.commit()
expected = textwrap.dedent("""
db_id
0 1
db_id tag_id target_table target_id
0 1 1 foos 1
1 2 2 foos 1
2 3 3 foos 1
db_id text
0 1 tag1
1 2 tag2
2 3 tag3
tag1
tag2
tag3"""
)
actual = []
with self.engine.begin() as conn:
actual.append(str(pd.read_sql_query(text("SELECT * FROM foos"),
conn)))
actual.append(str(pd.read_sql_query(
text("SELECT * FROM kk_tag_associations"), conn)))
actual.append(str(pd.read_sql_query(text("SELECT * FROM kk_tags"),
conn)))
for i in a.tags:
actual.append(i)
actual = '\n'.join(actual)
self.assertEqual(expected, '\n%s' % actual)
# @unittest.skip
def test_addstring_repeated(self):
a1 = Foo()
a2 = Foo()
a1.tags.append("tag1")
a2.tags.append("tag1")
# implicitly adds tags.
self.session.add(a1)
# self.session.commit()
# this does an autoflush.
t = self.session.query(kk.Tag).filter_by(text="tag1").first()
self.assertEqual(t.text, "tag1")
self.assertEqual(t.db_id, 1)
self.session.add(a2)
# FIXME committing here causes the test to fail.
# self.session.commit()
# self.assertEqual(a2.tags, ['tag1'])
# print(pd.read_sql_query(text("SELECT * FROM kk_tags"),
# self.engine))
del a1, a2, t
(a1, a2) = tuple(self.session.query(Foo))
t = self.session.query(kk.Tag).first()
self.assertEqual(len(list(self.session.query(kk.Tag))), 1)
self.assertEqual(a1.tags, ['tag1'])
self.assertEqual(a2.tags, ['tag1'])
self.assertEqual(t.db_id, 1)
# print(pd.read_sql_query("SELECT * FROM kk_tag_associations",
# self.engine))
# print(pd.read_sql_query("SELECT * FROM kk_tags",
# self.engine))
def test_tag_object_access(self):
a1 = Foo()
for i in ['tag1', 'tag2', 'tag3']:
a1.tags.append(i)
(t1, t2, t3) = [ta.tag_obj for ta in a1.kk_tag_associations]
self.assertEqual(t1, kk.Tag('tag1'))
self.assertEqual(t2, kk.Tag('tag2'))
self.assertEqual(t3, kk.Tag('tag3'))
# TODO test concurrently setting the same tag from different
# processes.
# TODO delete unused tags
# TODO test class inheritance. should work in simple cases. .
# . somewhat difficult, possibly infeasible in complex cases.
class kk_mocktest(unittest.TestCase):
"""Make sure mocking works as expected."""
@mock.patch('libkosokoso.Tag', autospec=True)
def test_tag_mock(self, tag_mock):
t1 = kk.Tag('test')
self.assertIsInstance(t1, mock.NonCallableMagicMock)
f1 = Foo()
f1.tags.append(t1)
self.assertIsInstance(f1.kk_tag_associations[0].tag_obj,
mock.NonCallableMagicMock)
# FIXME bypasses mock
f1.tags.append('test2')