# vim: set fileencoding=UTF-8 from __future__ import print_function import pandas as pd import pprint import sqlalchemy as sa import sqlite3 import unittest import libkosokoso as kk class Foo(kk.Taggable): __tablename__ = 'foos' db_id = sa.Column(sa.Integer, primary_key=True) def __repr__(self): return "foo: id %s" % self.db_id class Bar(kk.Taggable): __tablename__ = 'bars' db_id = sa.Column(sa.Integer, primary_key=True) def __repr__(self): return "bar: id %s" % self.db_id class ks_basic(unittest.TestCase): def setUp(self): # self.engine = sa.create_engine('sqlite://', echo=True) self.engine = sa.create_engine('sqlite://', echo=False) kk.Base.metadata.create_all(self.engine) self.session = sa.orm.Session(self.engine) def tearDown(self): del self.engine def test_add_as_object(self): a = Foo() self.session.add(a) self.session.commit() t = kk.Tag() t.text = u'test_tag' t2 = kk.Tag('test2') self.assertEqual('test2', t2.text) self.session.add(t) self.session.add(t2) self.session.commit() self.assertEqual(u'test_tag', t.text) self.assertEqual('test2', t2.text) self.assertEqual([], a.tags) a.tags.append(t) a.tags.append(t2) self.session.add(a) self.session.commit() a_id = a.db_id t_id = t.db_id t2_id = t2.db_id del a, t, t2 a = self.session.query(Foo).get(a_id) t = self.session.query(kk.Tag).get(t_id) t2 = self.session.query(kk.Tag).get(t2_id) self.assertEqual(['test_tag', 'test2'], a.tags) self.assertEqual(u'test_tag', t.text) self.assertEqual(u'test2', t2.text) def test_unique(self): t1 = kk.Tag('ttt') t2 = kk.Tag('ttt') self.session.add(t1) self.session.add(t2) self.session.commit() def test_collection(self): """Test access to the collection of objects associated with a tag.""" f1 = Foo() f2 = Foo() b1 = Bar() t = kk.Tag('tag1') self.session.add(f1) self.session.add(f2) self.session.add(b1) self.session.add(t) self.session.commit() del f1, f2, b1, t f1 = self.session.query(Foo).get(1) f2 = self.session.query(Foo).get(2) b1 = self.session.query(Bar).get(1) t = self.session.query(kk.Tag).get(1) f1.tags.append(t) f2.tags.append(t) b1.tags.append(t) self.session.commit() # print(pd.read_sql_query("SELECT * FROM kk_tag_associations", # self.engine)) # print("collection:") # for i in t.collection: # print(i) # print("collection:") # for i in t.collection: # print(i) self.assertEqual(3, len(t.collection)) # self.assertEqual(3, len(t.collection)) l = list(t.collection) # TODO do we need to test these? # these are the db_ids of the tag associations. # ta1 = self.session.query(kk.TagAssociation).get(1) # ta2 = self.session.query(kk.TagAssociation).get(2) # self.assertEqual(l[0], ta1) # self.assertNotEqual(l[1], ta1) # self.assertEqual(l[1], ta2) self.assertTrue(isinstance(l[0], Foo)) self.assertTrue(isinstance(l[1], Foo)) self.assertTrue(isinstance(l[2], Bar)) def test_association(self): a = Foo() b = Foo() c = Bar() t1 = kk.Tag("bleh") t2 = kk.Tag("blah") for i in [a,b]: i.tags.append(t1) for i in [b,c]: i.tags.append(t2) for i in [a,b,c,t1,t2]: self.session.add(i) self.session.commit() l = a.kk_tag_associations[0] self.assertEqual(l.tag_id, t1.db_id) self.assertEqual(l.target_table, "foos") self.assertEqual(l.target_id, a.db_id) def test_addstring(self): a = Foo() a.tags.append('testtag') self.session.add(a) self.session.commit() self.assertEqual(['testtag'], a.tags) t = self.session.query(kk.Tag).get(1) self.assertEqual(type(a), type(list(t.collection)[0])) self.assertEqual(a.db_id, list(t.collection)[0].db_id) def test_multibyte(self): a = Foo() a.tags.append(u'地域') self.session.add(a) self.session.commit() self.assertEqual([u'地域'], a.tags) def test_multiple(self): a = Foo() a.tags.append('testtag') self.session.add(a) b = Bar() b.tags.append('tagtest') self.session.add(b) # self.session.commit() c = Foo() c.tags.append('thirdtag') self.session.add(c) self.session.commit() self.assertEqual(['testtag'], a.tags) self.assertEqual(['tagtest'], b.tags) self.assertEqual(['thirdtag'], c.tags) a_id = a.db_id b_id = b.db_id c_id = c.db_id del a, b, c a = self.session.query(Foo).get(a_id) b = self.session.query(Bar).get(b_id) c = self.session.query(Foo).get(c_id) self.assertEqual(['testtag'], a.tags) self.assertEqual(['tagtest'], b.tags) self.assertEqual(['thirdtag'], c.tags) b.tags.append('testtag') # TODO figure out autocommit behavior. # self.session.commit() del b b = self.session.query(Bar).get(b_id) self.assertEqual(['tagtest', 'testtag'], b.tags) # print(pd.read_sql_query("SELECT * FROM kk_tag_associations", # self.engine)) # print(pd.read_sql_query("SELECT * FROM kk_tags", # self.engine)) def test_list(self): a = Foo() l = ['t1', 't2', 't3'] a.tags = l self.assertEqual(l, a.tags) self.session.add(a) self.session.commit() a_id = a.db_id a = self.session.query(Foo).get(a_id) self.assertEqual(l, a.tags) l2 = ['t4', 't5'] l.extend(l2) a.tags.extend(l2) self.assertEqual(l, a.tags) def test_addstring_repeated(self): a1 = Foo() a2 = Foo() a1.tags.append("tag1") a2.tags.append("tag1") self.session.add(a1) # self.session.commit() t = self.session.query(kk.Tag).filter_by(text="tag1").first() self.assertEqual(t.text, "tag1") self.assertEqual(t.db_id, 1) self.session.add(a2) # self.session.commit() self.assertEqual(a2.tags, ['tag1']) # print(pd.read_sql_query("SELECT * FROM kk_tags", # self.engine)) del a1, a2, t (a1, a2) = tuple(self.session.query(Foo)) t = self.session.query(kk.Tag).first() self.assertEqual(len(list(self.session.query(kk.Tag))), 1) self.assertEqual(a1.tags, ['tag1']) self.assertEqual(a2.tags, ['tag1']) self.assertEqual(t.db_id, 1) # print(pd.read_sql_query("SELECT * FROM kk_tag_associations", # self.engine)) # print(pd.read_sql_query("SELECT * FROM kk_tags", # self.engine)) # TODO test access to Tag objects # TODO test concurrent setting the same tag from different # processes. # TODO delete unused tags # TODO test class inheritance. should work in simple cases. . # . somewhat difficult, possibly infeasible in complex cases.