import sqlalchemy as sa from sqlalchemy.ext.declarative import declarative_base, declared_attr from sqlalchemy.ext.associationproxy import association_proxy import pprint Base = declarative_base() class Tag(Base): class TaggedObjectCollection(object): __emulates__ = list def __init__(self): self.data = [] def append(self, item): # print "collection append: %s" % item # self.data.append(item) # t = TagAssociation() # t.target_id = item.db_id # t.target_table = item.__tablename__ try: cls = self._get_class_by_tablename(item.target_table) thingy = cls() thingy.db_id = item.target_id self.data.append(thingy) except TypeError: raise def remove(self, item): pass def extend(self, item): pass def __iter__(self): return iter(self.data) def __len__(self): return len(self.data) def _get_class_by_tablename(self, tablename): """Return class reference mapped to table. :param tablename: String with name of table. :return: Class reference or None. """ for c in Base._decl_class_registry.values(): if (hasattr(c, '__tablename__') and c.__tablename__ == tablename): return c __tablename__ = "kk_tags" db_id = sa.Column(sa.Integer, primary_key=True) text = sa.Column(sa.Unicode(255, convert_unicode=False), unique=True) # this & collection should be read-only? collection = sa.orm.relationship("TagAssociation", collection_class=TaggedObjectCollection, enable_typechecks = False, # back_populates="tag_obj", primaryjoin="TagAssociation.tag_id==Tag.db_id" ) def __init__(self, tag_text=None): super(Tag, self).__init__() self.text = tag_text # if tag_text: # select_expr = sa.select([self.__table__]).where( # self.__table__.c.text == tag_text) # session = sa.inspect(self).session # extant = session.execute(select_expr) # if extant: # self.db_id = extant[0].db_id # self.text = extant[0].text # else: # self.text = tag_text def enforce_unique_text(self, key, value): pass # i have a feeling this may require a flush. def _get_class_by_tablename(self, tablename): """Return class reference mapped to table. :param tablename: String with name of table. :return: Class reference or None. """ for c in Base._decl_class_registry.values(): if (hasattr(c, '__tablename__') and c.__tablename__ == tablename): return c class TagAssociation(Base): __tablename__ = "kk_tag_associations" db_id = sa.Column(sa.Integer, primary_key=True) tag_id = sa.Column(sa.Integer, sa.ForeignKey("kk_tags.db_id")) target_table = sa.Column(sa.Unicode(255, convert_unicode=False)) target_id = sa.Column(sa.Integer) tag_obj = sa.orm.relationship("Tag") # FIXME doesn't work, may not be needed. # tag_obj = sa.orm.relationship("Tag", back_populates="collection") tag = association_proxy("tag_obj", "text") def __eq__(self, other): if not isinstance(other, TagAssociation): return False return self.db_id == other.db_id # def __init__(self): # if self.args and self.args.get("target_table"): # self.target_table = self.args["target_table"] # @classmethod # def creator(cls, target_table): # return lambda tags: TagAssociation(tags=tags, # discriminator=target_table) @property def parent(self): return getattr(self, "%s_parent" % self.target_table) def __repr__(self): return "tag assoc %s: %s:%s" % (self.db_id, self.target_table, self.target_id) class Taggable(Base): __abstract__ = True # FIXME this doesn't seem to actually be needed. @staticmethod def _outer_join_accessor_factory(collection_type, proxy): def getter(obj): if obj is None: return [] return getattr(obj, proxy.value_attr) def setter(obj, value): setattr(obj, proxy.value_attr, value) return getter, setter @declared_attr def kk_tag_associations(cls): name = cls.__name__ table = cls.__tablename__ def cls_init(self, tag=None): super(TagAssociation, self).__init__() if self.args and self.args.get("target_table"): target_table = self.args["target_table"] self.target_table = u"%s" % target_table if tag: if isinstance(tag, Tag): self.tag_obj = tag else: self.tag = tag assoc_cls = type( "%sTagAssociation" % name, (TagAssociation, ), dict( __mapper_args__ = { "polymorphic_identity": table, "polymorphic_on": "target_table", }, __init__ = cls_init, args = { "target_table": table }, ) ) cls.tags = association_proxy( "kk_tag_associations", "tag", # getset_factory=cls._outer_join_accessor_factory, ) return sa.orm.relationship( assoc_cls, primaryjoin=( "and_(TagAssociation.target_table=='%s', " "foreign(TagAssociation.target_id)==%s.db_id)" %(table, name)), ) # @sa.event.listens_for(Tag.associations, 'append') # def receive_append(target, value, initiator): # print "append %s to %s" % (value, target) # cls = target._get_class_by_tablename(value.target_table) # thingy = cls() # thingy.db_id = value.target_id # target.collection.append(thingy) # TODO remove, extend, set, modified, init_scalar?, init_collection?