366 lines
10 KiB
Python
366 lines
10 KiB
Python
# vim: set fileencoding=UTF-8
|
|
|
|
from __future__ import print_function
|
|
|
|
import pandas as pd
|
|
import pprint
|
|
import sqlalchemy as sa
|
|
import sqlite3
|
|
import textwrap
|
|
import unittest
|
|
|
|
import libkosokoso as kk
|
|
|
|
# TODO get this working -- merge bases?
|
|
# DummyBase = sa.ext.declarative.declarative_base()
|
|
class DummyBase(object):
|
|
"""dummy"""
|
|
|
|
class Foo(DummyBase, kk.Taggable):
|
|
__tablename__ = 'foos'
|
|
db_id = sa.Column(sa.Integer, primary_key=True)
|
|
def __repr__(self):
|
|
return "foo: id %s" % self.db_id
|
|
|
|
def __eq__(self, other):
|
|
return self.db_id == other.db_id
|
|
|
|
class Bar(DummyBase, kk.Taggable):
|
|
__tablename__ = 'bars'
|
|
db_id = sa.Column(sa.Integer, primary_key=True)
|
|
def __repr__(self):
|
|
return "bar: id %s" % self.db_id
|
|
|
|
class ks_basic(unittest.TestCase):
|
|
def setUp(self):
|
|
# self.engine = sa.create_engine('sqlite://', echo=True)
|
|
self.engine = sa.create_engine('sqlite://', echo=False)
|
|
kk.Base.metadata.create_all(self.engine)
|
|
self.session = sa.orm.Session(self.engine)
|
|
|
|
def tearDown(self):
|
|
del self.engine
|
|
|
|
def test_add_as_object(self):
|
|
a = Foo()
|
|
self.session.add(a)
|
|
self.session.commit()
|
|
|
|
t = kk.Tag()
|
|
t.text = u'test_tag'
|
|
t2 = kk.Tag('test2')
|
|
self.assertEqual('test2', t2.text)
|
|
self.session.add(t)
|
|
self.session.add(t2)
|
|
self.session.commit()
|
|
self.assertEqual(u'test_tag', t.text)
|
|
self.assertEqual('test2', t2.text)
|
|
|
|
self.assertEqual([], a.tags)
|
|
|
|
a.tags.append(t)
|
|
a.tags.append(t2)
|
|
self.session.add(a)
|
|
self.session.commit()
|
|
|
|
a_id = a.db_id
|
|
t_id = t.db_id
|
|
t2_id = t2.db_id
|
|
del a, t, t2
|
|
a = self.session.query(Foo).get(a_id)
|
|
t = self.session.query(kk.Tag).get(t_id)
|
|
t2 = self.session.query(kk.Tag).get(t2_id)
|
|
self.assertEqual(['test_tag', 'test2'], a.tags)
|
|
self.assertEqual(u'test_tag', t.text)
|
|
self.assertEqual(u'test2', t2.text)
|
|
|
|
def test_unique(self):
|
|
t1 = kk.Tag('ttt')
|
|
t2 = kk.Tag('ttt')
|
|
|
|
self.session.add(t1)
|
|
self.session.add(t2)
|
|
self.session.commit()
|
|
|
|
def test_collection(self):
|
|
"""Test access to the collection of objects associated with a
|
|
tag."""
|
|
f1 = Foo()
|
|
f2 = Foo()
|
|
b1 = Bar()
|
|
t = kk.Tag('tag1')
|
|
|
|
self.session.add(f1)
|
|
self.session.add(f2)
|
|
self.session.add(b1)
|
|
self.session.add(t)
|
|
self.session.commit()
|
|
|
|
del f1, f2, b1, t
|
|
|
|
f1 = self.session.query(Foo).get(1)
|
|
f2 = self.session.query(Foo).get(2)
|
|
b1 = self.session.query(Bar).get(1)
|
|
t = self.session.query(kk.Tag).get(1)
|
|
t2 = kk.Tag('tag2')
|
|
t3 = kk.Tag('tag3')
|
|
self.session.add(t2)
|
|
self.session.add(t3)
|
|
|
|
f1.tags.append(t)
|
|
f2.tags.append(t)
|
|
b1.tags.append(t)
|
|
|
|
f2.tags.append(t2)
|
|
b1.tags.append(t3)
|
|
b1.tags.append(t2)
|
|
self.session.commit()
|
|
|
|
# print(pd.read_sql_query("SELECT * FROM kk_tag_associations",
|
|
# self.engine))
|
|
# print("collection:")
|
|
# for i in t.collection:
|
|
# print(i)
|
|
# print("collection:")
|
|
# for i in t.collection:
|
|
# print(i)
|
|
|
|
self.assertEqual(3, len(t.collection))
|
|
self.assertEqual(2, len(t2.collection))
|
|
self.assertEqual(1, len(t3.collection))
|
|
l = list(t.collection)
|
|
|
|
# TODO do we need to test these?
|
|
ta1 = self.session.query(kk.TagAssociation).get(1)
|
|
ta2 = self.session.query(kk.TagAssociation).get(2)
|
|
self.assertEqual(l[0].db_id, ta1.target_id)
|
|
self.assertNotEqual(l[1].db_id, ta1.target_id)
|
|
self.assertEqual(l[1].db_id, ta2.target_id)
|
|
|
|
self.assertTrue(isinstance(l[0], Foo))
|
|
self.assertTrue(isinstance(l[1], Foo))
|
|
self.assertTrue(isinstance(l[2], Bar))
|
|
|
|
l2 = list(t2.collection)
|
|
self.assertTrue(isinstance(l2[0], Foo))
|
|
self.assertTrue(isinstance(l2[1], Bar))
|
|
|
|
l3 = list(t3.collection)
|
|
self.assertTrue(isinstance(l3[0], Bar))
|
|
|
|
def test_writable_collection(self):
|
|
"""Test adding objects to a tag's collection member"""
|
|
f1 = Foo()
|
|
f2 = Foo()
|
|
b1 = Bar()
|
|
t = kk.Tag('tag1')
|
|
|
|
self.session.add(f1)
|
|
self.session.add(f2)
|
|
self.session.add(b1)
|
|
self.session.add(t)
|
|
self.session.commit()
|
|
|
|
# t.collection.append(f1)
|
|
t.collection.append(f1)
|
|
t.collection.append(f2)
|
|
t.collection.append(b1)
|
|
self.session.commit()
|
|
|
|
# print(pd.read_sql_query("SELECT * FROM kk_tag_associations",
|
|
# self.engine))
|
|
# print("collection:")
|
|
# for i in t.collection:
|
|
# print(i)
|
|
|
|
self.assertEqual(3, len(t.collection))
|
|
l = list(t.collection)
|
|
|
|
self.assertTrue(isinstance(l[0], Foo))
|
|
self.assertEqual(l[0], f1)
|
|
self.assertTrue(isinstance(l[2], Bar))
|
|
|
|
def test_association(self):
|
|
a = Foo()
|
|
b = Foo()
|
|
c = Bar()
|
|
t1 = kk.Tag("bleh")
|
|
t2 = kk.Tag("blah")
|
|
|
|
for i in [a,b]:
|
|
i.tags.append(t1)
|
|
for i in [b,c]:
|
|
i.tags.append(t2)
|
|
for i in [a,b,c,t1,t2]:
|
|
self.session.add(i)
|
|
self.session.commit()
|
|
|
|
l = a.kk_tag_associations[0]
|
|
self.assertEqual(l.tag_id, t1.db_id)
|
|
self.assertEqual(l.target_table, "foos")
|
|
self.assertEqual(l.target_id, a.db_id)
|
|
|
|
def test_addstring(self):
|
|
a = Foo()
|
|
a.tags.append('testtag')
|
|
self.session.add(a)
|
|
self.session.commit()
|
|
|
|
self.assertEqual(['testtag'], a.tags)
|
|
t = self.session.query(kk.Tag).get(1)
|
|
self.assertEqual(type(a), type(list(t.collection)[0]))
|
|
self.assertEqual(a.db_id, list(t.collection)[0].db_id)
|
|
|
|
def test_multibyte(self):
|
|
a = Foo()
|
|
a.tags.append(u'地域')
|
|
self.session.add(a)
|
|
self.session.commit()
|
|
|
|
self.assertEqual([u'地域'], a.tags)
|
|
|
|
def test_multiple(self):
|
|
a = Foo()
|
|
a.tags.append('testtag')
|
|
self.session.add(a)
|
|
|
|
b = Bar()
|
|
b.tags.append('tagtest')
|
|
self.session.add(b)
|
|
# self.session.commit()
|
|
|
|
c = Foo()
|
|
c.tags.append('thirdtag')
|
|
self.session.add(c)
|
|
self.session.commit()
|
|
|
|
self.assertEqual(['testtag'], a.tags)
|
|
self.assertEqual(['tagtest'], b.tags)
|
|
self.assertEqual(['thirdtag'], c.tags)
|
|
a_id = a.db_id
|
|
b_id = b.db_id
|
|
c_id = c.db_id
|
|
|
|
del a, b, c
|
|
|
|
a = self.session.query(Foo).get(a_id)
|
|
b = self.session.query(Bar).get(b_id)
|
|
c = self.session.query(Foo).get(c_id)
|
|
self.assertEqual(['testtag'], a.tags)
|
|
self.assertEqual(['tagtest'], b.tags)
|
|
self.assertEqual(['thirdtag'], c.tags)
|
|
b.tags.append('testtag')
|
|
# TODO figure out autocommit behavior.
|
|
# self.session.commit()
|
|
|
|
del b
|
|
b = self.session.query(Bar).get(b_id)
|
|
self.assertEqual(['tagtest', 'testtag'], b.tags)
|
|
|
|
# print(pd.read_sql_query("SELECT * FROM kk_tag_associations",
|
|
# self.engine))
|
|
# print(pd.read_sql_query("SELECT * FROM kk_tags",
|
|
# self.engine))
|
|
|
|
def test_list(self):
|
|
a = Foo()
|
|
l = ['t1', 't2', 't3']
|
|
a.tags = l
|
|
self.assertEqual(l, a.tags)
|
|
self.session.add(a)
|
|
self.session.commit()
|
|
|
|
a_id = a.db_id
|
|
a = self.session.query(Foo).get(a_id)
|
|
self.assertEqual(l, a.tags)
|
|
|
|
l2 = ['t4', 't5']
|
|
l.extend(l2)
|
|
a.tags.extend(l2)
|
|
self.assertEqual(l, a.tags)
|
|
|
|
def test_pretty(self):
|
|
"""verify that the example in the readme works."""
|
|
a = Foo()
|
|
a.tags.append('tag1')
|
|
a.tags.extend(['tag2', 'tag3'])
|
|
self.session.add(a)
|
|
self.session.commit()
|
|
|
|
expected = textwrap.dedent("""
|
|
db_id
|
|
0 1
|
|
db_id tag_id target_table target_id
|
|
0 1 1 foos 1
|
|
1 2 2 foos 1
|
|
2 3 3 foos 1
|
|
db_id text
|
|
0 1 tag1
|
|
1 2 tag2
|
|
2 3 tag3
|
|
tag1
|
|
tag2
|
|
tag3"""
|
|
)
|
|
|
|
actual = []
|
|
actual.append(str(pd.read_sql_query("SELECT * FROM foos",
|
|
self.engine)))
|
|
actual.append(str(pd.read_sql_query("SELECT * FROM kk_tag_associations",
|
|
self.engine)))
|
|
actual.append(str(pd.read_sql_query("SELECT * FROM kk_tags",
|
|
self.engine)))
|
|
for i in a.tags:
|
|
actual.append(i)
|
|
actual = '\n'.join(actual)
|
|
self.assertEqual(expected, '\n%s' % actual)
|
|
|
|
def test_addstring_repeated(self):
|
|
a1 = Foo()
|
|
a2 = Foo()
|
|
|
|
a1.tags.append("tag1")
|
|
a2.tags.append("tag1")
|
|
self.session.add(a1)
|
|
# self.session.commit()
|
|
t = self.session.query(kk.Tag).filter_by(text="tag1").first()
|
|
self.assertEqual(t.text, "tag1")
|
|
self.assertEqual(t.db_id, 1)
|
|
self.session.add(a2)
|
|
# self.session.commit()
|
|
self.assertEqual(a2.tags, ['tag1'])
|
|
# print(pd.read_sql_query("SELECT * FROM kk_tags",
|
|
# self.engine))
|
|
|
|
del a1, a2, t
|
|
(a1, a2) = tuple(self.session.query(Foo))
|
|
t = self.session.query(kk.Tag).first()
|
|
self.assertEqual(len(list(self.session.query(kk.Tag))), 1)
|
|
self.assertEqual(a1.tags, ['tag1'])
|
|
self.assertEqual(a2.tags, ['tag1'])
|
|
self.assertEqual(t.db_id, 1)
|
|
|
|
# print(pd.read_sql_query("SELECT * FROM kk_tag_associations",
|
|
# self.engine))
|
|
# print(pd.read_sql_query("SELECT * FROM kk_tags",
|
|
# self.engine))
|
|
|
|
def test_tag_object_access(self):
|
|
a1 = Foo()
|
|
for i in ['tag1', 'tag2', 'tag3']:
|
|
a1.tags.append(i)
|
|
(t1, t2, t3) = [ta.tag_obj for ta in a1.kk_tag_associations]
|
|
self.assertEqual(t1, kk.Tag('tag1'))
|
|
self.assertEqual(t2, kk.Tag('tag2'))
|
|
self.assertEqual(t3, kk.Tag('tag3'))
|
|
|
|
|
|
# TODO test concurrent setting the same tag from different
|
|
# processes.
|
|
|
|
# TODO delete unused tags
|
|
|
|
# TODO test class inheritance. should work in simple cases. .
|
|
# . somewhat difficult, possibly infeasible in complex cases.
|
|
|