commit b12815917ee3fc621dd13d90f4346d429f308634 Author: chris t Date: Sun Apr 8 00:03:34 2018 -0700 initial commit, before cleanup. diff --git a/Readme.rst b/Readme.rst new file mode 100644 index 0000000..9393c2b --- /dev/null +++ b/Readme.rst @@ -0,0 +1,52 @@ +Libkosokoso -- simple tagging with sqlalchemy. +============================================== + +Libkosokoso provides compact and table-stingy tags to +sqlalchemy-backed objects. + +Usage +----- + +:: + + import libkosokoso as kk + + class Foo(kk.Taggable): + __tablename__ = 'foos' + + a = Foo() + a.tags.append('tag1') + a.tags.extend(['tag2', 'tag3']) + +and so forth. + +It's also possible to add kk.Tag objects directly to the tags +property, but they'll be treated as strings. To access the tag +objects, do something like:: + + return [ta.tag_obj for ta in a.kk_tag_associations] + +This will bypass the association proxy and return a list of Tag +objects. + + +Design considerations +--------------------- + +My basic reasoning was that at some point I'd have to go and muck +about in the database by hand, and I wanted a structure that was easy +to perceive and modify. + +I explicitly did not aim for efficiency. The underlying code is +object-happy and probably extremely inefficient. + + +Why is it called that? +---------------------- + +"kosokoso" is Japanese onomatopoeia for sneaking or whispering +secretly. I picked the name because tags always feel like a kind of +buzzing side channel to me. + +(Originally, it was "guzuguzu", which is, like, slow. This took me +way longer to write than I was expecting.) diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..321df17 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +from kosokoso import * diff --git a/kosokoso.py b/kosokoso.py new file mode 100644 index 0000000..4f48c52 --- /dev/null +++ b/kosokoso.py @@ -0,0 +1,202 @@ +import sqlalchemy as sa +from sqlalchemy.ext.declarative import declarative_base, declared_attr +from sqlalchemy.ext.associationproxy import association_proxy + +import pprint + +Base = declarative_base() + + +class Tag(Base): + + class TaggedObjectCollection(object): + __emulates__ = list + + def __init__(self): + self.data = [] + + def append(self, item): + # print "collection append: %s" % item + # self.data.append(item) + + # t = TagAssociation() + # t.target_id = item.db_id + # t.target_table = item.__tablename__ + + try: + cls = self._get_class_by_tablename(item.target_table) + thingy = cls() + thingy.db_id = item.target_id + self.data.append(thingy) + except TypeError: + raise + + def remove(self, item): + pass + def extend(self, item): + pass + def __iter__(self): + return iter(self.data) + def __len__(self): + return len(self.data) + + def _get_class_by_tablename(self, tablename): + """Return class reference mapped to table. + + :param tablename: String with name of table. + :return: Class reference or None. + """ + for c in Base._decl_class_registry.values(): + if (hasattr(c, '__tablename__') + and c.__tablename__ == tablename): + return c + + + __tablename__ = "kk_tags" + db_id = sa.Column(sa.Integer, primary_key=True) + text = sa.Column(sa.Unicode(255, convert_unicode=False), + unique=True) + # this & collection should be read-only? + collection = sa.orm.relationship("TagAssociation", + collection_class=TaggedObjectCollection, + enable_typechecks = False, + # back_populates="tag_obj", + primaryjoin="TagAssociation.tag_id==Tag.db_id" + ) + + def __init__(self, tag_text=None): + super(Tag, self).__init__() + self.text = tag_text + # if tag_text: + # select_expr = sa.select([self.__table__]).where( + # self.__table__.c.text == tag_text) + # session = sa.inspect(self).session + # extant = session.execute(select_expr) + + # if extant: + # self.db_id = extant[0].db_id + # self.text = extant[0].text + # else: + # self.text = tag_text + + def enforce_unique_text(self, key, value): + pass + # i have a feeling this may require a flush. + + + def _get_class_by_tablename(self, tablename): + """Return class reference mapped to table. + + :param tablename: String with name of table. + :return: Class reference or None. + """ + for c in Base._decl_class_registry.values(): + if (hasattr(c, '__tablename__') + and c.__tablename__ == tablename): + return c + + + +class TagAssociation(Base): + __tablename__ = "kk_tag_associations" + db_id = sa.Column(sa.Integer, primary_key=True) + tag_id = sa.Column(sa.Integer, sa.ForeignKey("kk_tags.db_id")) + target_table = sa.Column(sa.Unicode(255, convert_unicode=False)) + target_id = sa.Column(sa.Integer) + + tag_obj = sa.orm.relationship("Tag") + # FIXME doesn't work, may not be needed. + # tag_obj = sa.orm.relationship("Tag", back_populates="collection") + tag = association_proxy("tag_obj", "text") + + + def __eq__(self, other): + if not isinstance(other, TagAssociation): + return False + return self.db_id == other.db_id + + # def __init__(self): + # if self.args and self.args.get("target_table"): + # self.target_table = self.args["target_table"] + # @classmethod + # def creator(cls, target_table): + # return lambda tags: TagAssociation(tags=tags, + # discriminator=target_table) + + @property + def parent(self): + return getattr(self, "%s_parent" % self.target_table) + + def __repr__(self): + return "tag assoc %s: %s:%s" % (self.db_id, + self.target_table, self.target_id) + +class Taggable(Base): + __abstract__ = True + + # FIXME this doesn't seem to actually be needed. + @staticmethod + def _outer_join_accessor_factory(collection_type, proxy): + def getter(obj): + if obj is None: + return [] + return getattr(obj, proxy.value_attr) + def setter(obj, value): + setattr(obj, proxy.value_attr, value) + return getter, setter + + @declared_attr + def kk_tag_associations(cls): + name = cls.__name__ + table = cls.__tablename__ + + def cls_init(self, tag=None): + super(TagAssociation, self).__init__() + if self.args and self.args.get("target_table"): + target_table = self.args["target_table"] + self.target_table = u"%s" % target_table + if tag: + if isinstance(tag, Tag): + self.tag_obj = tag + else: + self.tag = tag + + + assoc_cls = type( + "%sTagAssociation" % name, + (TagAssociation, ), + dict( + __mapper_args__ = { + "polymorphic_identity": table, + "polymorphic_on": "target_table", + }, + __init__ = cls_init, + args = { + "target_table": table + }, + ) + ) + cls.tags = association_proxy( + "kk_tag_associations", "tag", + # getset_factory=cls._outer_join_accessor_factory, + ) + + + return sa.orm.relationship( + assoc_cls, + primaryjoin=( + "and_(TagAssociation.target_table=='%s', " + "foreign(TagAssociation.target_id)==%s.db_id)" + %(table, name)), + ) + + +# @sa.event.listens_for(Tag.associations, 'append') +# def receive_append(target, value, initiator): +# print "append %s to %s" % (value, target) +# cls = target._get_class_by_tablename(value.target_table) +# thingy = cls() +# thingy.db_id = value.target_id +# target.collection.append(thingy) + +# TODO remove, extend, set, modified, init_scalar?, init_collection? diff --git a/tests/test_basic.py b/tests/test_basic.py new file mode 100644 index 0000000..fadd6d8 --- /dev/null +++ b/tests/test_basic.py @@ -0,0 +1,204 @@ +# vim: set fileencoding=UTF-8 + +from __future__ import print_function + +import pandas as pd +import pprint +import sqlalchemy as sa +import sqlite3 +import unittest +import libkosokoso as kk + +class Foo(kk.Taggable): + __tablename__ = 'foos' + db_id = sa.Column(sa.Integer, primary_key=True) + def __repr__(self): + return "foo: id %s" % self.db_id + +class Bar(kk.Taggable): + __tablename__ = 'bars' + db_id = sa.Column(sa.Integer, primary_key=True) + def __repr__(self): + return "bar: id %s" % self.db_id + +class ks_basic(unittest.TestCase): + def setUp(self): + # self.engine = sa.create_engine('sqlite://', echo=True) + self.engine = sa.create_engine('sqlite://', echo=False) + kk.Base.metadata.create_all(self.engine) + self.session = sa.orm.Session(self.engine) + + def test_add_as_object(self): + a = Foo() + self.session.add(a) + self.session.commit() + + t = kk.Tag() + t.text = u'test_tag' + t2 = kk.Tag('test2') + self.assertEqual('test2', t2.text) + self.session.add(t) + self.session.add(t2) + self.session.commit() + self.assertEqual(u'test_tag', t.text) + self.assertEqual('test2', t2.text) + + self.assertEqual([], a.tags) + + a.tags.append(t) + a.tags.append(t2) + self.session.add(a) + self.session.commit() + + a_id = a.db_id + t_id = t.db_id + del a, t, t2 + a = self.session.query(Foo).get(a_id) + t = self.session.query(kk.Tag).get(t_id) + self.assertEqual(['test_tag', 'test2'], a.tags) + self.assertEqual(u'test_tag', t.text) + + def test_collection(self): + """Test access to the collection of objects associated with a + tag.""" + f1 = Foo() + f2 = Foo() + b1 = Bar() + t = kk.Tag('tag1') + + self.session.add(f1) + self.session.add(f2) + self.session.add(b1) + self.session.add(t) + self.session.commit() + + del f1, f2, b1, t + + f1 = self.session.query(Foo).get(1) + f2 = self.session.query(Foo).get(2) + b1 = self.session.query(Bar).get(1) + t = self.session.query(kk.Tag).get(1) + # FIXME there's a bug where adding via text doesn't check if a + # row with the same text already exists. + f1.tags.append(t) + f2.tags.append(t) + b1.tags.append(t) + self.session.commit() + + # print(pd.read_sql_query("SELECT * FROM kk_tag_associations", + # self.engine)) + # print("collection:") + # for i in t.collection: + # print(i) + # print("collection:") + # for i in t.collection: + # print(i) + + self.assertEqual(3, len(t.collection)) + # self.assertEqual(3, len(t.collection)) + l = list(t.collection) + + # these are the db_ids of the tag associations. + # ta1 = self.session.query(kk.TagAssociation).get(1) + # ta2 = self.session.query(kk.TagAssociation).get(2) + # self.assertEqual(l[0], ta1) + # self.assertNotEqual(l[1], ta1) + # self.assertEqual(l[1], ta2) + + self.assertTrue(isinstance(l[0], Foo)) + self.assertTrue(isinstance(l[1], Foo)) + self.assertTrue(isinstance(l[2], Bar)) + + def test_addstring(self): + a = Foo() + a.tags.append('testtag') + self.session.add(a) + self.session.commit() + + self.assertEqual(['testtag'], a.tags) + t = self.session.query(kk.Tag).get(1) + self.assertEqual(type(a), type(list(t.collection)[0])) + self.assertEqual(a.db_id, list(t.collection)[0].db_id) + + def test_multibyte(self): + a = Foo() + a.tags.append(u'地域') + self.session.add(a) + self.session.commit() + + self.assertEqual([u'地域'], a.tags) + + def test_multiple(self): + a = Foo() + a.tags.append('testtag') + self.session.add(a) + + b = Bar() + b.tags.append('tagtest') + self.session.add(b) + self.session.commit() + + c = Foo() + c.tags.append('thirdtag') + self.session.add(c) + self.session.commit() + + self.assertEqual(['testtag'], a.tags) + self.assertEqual(['tagtest'], b.tags) + self.assertEqual(['thirdtag'], c.tags) + a_id = a.db_id + b_id = b.db_id + c_id = c.db_id + + del a, b, c + + a = self.session.query(Foo).get(a_id) + b = self.session.query(Bar).get(b_id) + c = self.session.query(Foo).get(c_id) + self.assertEqual(['testtag'], a.tags) + self.assertEqual(['tagtest'], b.tags) + self.assertEqual(['thirdtag'], c.tags) + b.tags.append('tagtest2') + # TODO figure out autocommit behavior. + # self.session.commit() + + del b + b = self.session.query(Bar).get(b_id) + self.assertEqual(['tagtest', 'tagtest2'], b.tags) + + # TODO test class inheritance. should work in simple cases. . + # . somewhat difficult, possibly infeasible in complex cases. + + # print(pd.read_sql_query("SELECT * FROM kk_tag_associations", + # self.engine)) + + def test_list(self): + a = Foo() + l = ['t1', 't2', 't3'] + a.tags = l + self.assertEqual(l, a.tags) + self.session.add(a) + self.session.commit() + + a_id = a.db_id + a = self.session.query(Foo).get(a_id) + self.assertEqual(l, a.tags) + + l2 = ['t4', 't5'] + l.extend(l2) + a.tags.extend(l2) + self.assertEqual(l, a.tags) + + # def test_addstring_repeated(self): + # a1 = Foo() + # a2 = Foo() + + # a1.tags.append("tag1") + # a1.tags.append("tag1") + # self.session.add(a1, a2) + # self.session.commit() + + + # TODO test access to Tag objects + + # TODO delete unused tags