[email protected] host1:db port1/db name1?charset=utf8', 'db2': 'mysql+pymysql://db user2:db...">

当前位置:网站首页>Flash operates on multiple databases

Flash operates on multiple databases

2022-04-23 17:59:00 Feng Ye 520

config.py:

SQLALCHEMY_BINDS = {
    "db1": "mysql+pymysql://db_user1:[email protected]_host1:db_port1/db_name1?charset=utf8",
    "db2": "mysql+pymysql://db_user2:[email protected]_host2:db_port2/db_name2?charset=utf8",
}



models.py:

def dynamic_model(bind_key, model_n):
    properties = {
        "__tablename__" = model_n.__tablename__,
        "__bind_key__" = bind_key
    }
    for col in dir(model_n):
        if col.startswith('_'):
            continue
        properties[col] = getattr(model_n, col)
    model = type(model_n.__tablename__, (db.Model,), properties)
    return model

Custom Serialization :

class Serialize:
    def __init__(self, model):
        self.query = model.query
        self.model = model
        self.columns = []
        self.page_size = request.args.get('page_size', type=int, default=10)
        self.current_page = request.args.get('page', default=1, type=int)
        self.start = (self.current_page - 1) * self.page_size
        self.end = self.start + self.page_size
        self.field_func_map = {}

    def field(self, *columns):
        self.columns = columns
        self.query = db.session.query(*[getattr(self.model, column) for column in columns])
        return self

    def search(self, **kw_map):
        for column in kw_map:
            if kw_map[column]:
                self.query = self.query.filter(
                    getattr(self.model, column).like("%{}%".format('/' + kw_map[column]), escape='/'))
        return self

    def exclude(self, **kw_map):
        for column in kw_map:
            self.query = self.query.filter(
                getattr(self.model, column) != kw_map[column])
        return self

    def custom_handle(self, **kargs):
        self.field_func_map = kargs
        return self

    def to_json(self, **kw_map):
        res = []
        for column in kw_map:
            self.query = self.query.filter(
                getattr(self.model, column) == kw_map[column])
        if self.columns:
            for q in self.query.slice(self.start, self.end):
                obj_dict = {}
                for i, c in enumerate(self.columns):
                    if c in self.field_func_map:
                        obj_dict[c] = self.field_func_map[c.name](q[i])
                    else:
                        obj_dict[c] = q[i]
                res.append(obj_dict)
        else:
            for q in self.query.slice(self.start, self.end):
                obj_dict = {}
                for c in class_mapper(q.__class__).columns:
                    if c.name in self.field_func_map:
                        obj_dict[c.name] = self.field_func_map[c.name](getattr(q, c.name))
                    else:
                        obj_dict[c.name] = getattr(q, c.name)
                res.append(obj_dict)
        return res

    def total(self):
        return self.query.count()

    def data(self, **kw_map):
        return {
            'data': self.to_json(**kw_map),
            'total': self.total(),
            'currentPage': self.current_page,
            'pageSize': self.page_size
        }

版权声明
本文为[Feng Ye 520]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204230546133951.html