Fork me on GitHub

鱼书

flask最小原型

1
2
3
4
5
6
7
8
9
from flask import Flask
app = Flask(__name__)

@app.route('/hello/')
def hello_world():
return 'Hello World!'

if __name__ == '__main__':
app.run(debug=True)

@app.route('/hello/')@app.route(\'/hello\')的区别:
浏览器输入 http://127.0.0.1:5000/hello,两个视图函数都可以匹配到
浏览器输入 http://127.0.0.1:5000/hello/,只能匹配到前者
如果视图函数是前者,输入的是http://127.0.0.1:5000/hello,
flask会进行重定向到http://127.0.0.1:5000/hello/页面

url的注册方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Basically this example::

@app.route('/')
def index():
pass

Is equivalent to the following::

def index():
pass
app.add_url_rule('/', 'index', index)

If the view_func is not provided you will need to connect the endpoint
to a view function like so::

app.view_functions['index'] = index

Internally :meth:`route` invokes :meth:`add_url_rule` so if you want
to customize the behavior via subclassing you only need to change
this method.

@app.route()内部调用的依然是add_url_rule方法,源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def route(self, rule, **options):
"""A decorator that is used to register a view function for a
given URL rule. This does the same thing as :meth:`add_url_rule`
but is intended for decorator usage::

@app.route('/')
def index():
return 'Hello World'

For more information refer to :ref:`url-route-registrations`.

:param rule: the URL rule as string
:param endpoint: the endpoint for the registered URL rule. Flask
itself assumes the name of the view function as
endpoint
:param options: the options to be forwarded to the underlying
:class:`~werkzeug.routing.Rule` object. A change
to Werkzeug is handling of method options. methods
is a list of methods this rule should be limited
to (``GET``, ``POST`` etc.). By default a rule
just listens for ``GET`` (and implicitly ``HEAD``).
Starting with Flask 0.6, ``OPTIONS`` is implicitly
added and handled by the standard request handling.
"""
def decorator(f):
endpoint = options.pop('endpoint', None)
self.add_url_rule(rule, endpoint, f, **options)
return f
return decorator

配置文件

https://dormousehole.readthedocs.io/en/latest/config.html#id4

新建一个配置文件config.py,在这个文件中进行变量的设置
内置变量参考上面的链接,变量必须大写

导入方式

1
2
3
4
5
6
7
8
9
10
# 第一种
from flask import Flask
import config
app = Flask(__name__)
app.config.from_object(config)

# 第二种
from flask import Flask
app = Flask(__name__)
app.config.from_object('config')

源码

1
2
3
4
5
Example of module-based configuration::

app.config.from_object('yourapplication.default_config')
from yourapplication import default_config
app.config.from_object(default_config)

run参数

1
2
def run(self, host=None, port=None, debug=None,
load_dotenv=True, **options):

图书信息搜索

api地址

1
2
3
4
# 关键字搜索
http://t.yushu.im/v2/book/search?q={}&start={}&count={}
# isbn搜索
http://t.yushu.im/v2/book/search/isbn/{isbn}

视图函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@app.route("/search/<q>/<page>")
def search(q,page):
"""
q: 关键字或者isbn
page: 第几页
?q=金庸&page=1
"""
# isbn isbn13 由13个0-9在数字组成
# isbn10 由10表0-9表数字组组成,中间可能包含' - '

isbn_or_key = 'key'
if len(q) == 13 and q.isdigit():
isbn_or_key = 'isbn'
short_q = q.replace('-', '')
if '-' in q and len(short_q) == 10 and short_q.isdigit():
isbn_or_key = 'isbn'
pass

多个逻辑判断排列原则:
1.大部分判断结果为假的条件应该放在前面;
2.需要查询数据库的操作由于会消耗资源,应该尽量靠后

视图函数重构

如果判断非常多的话,这个视图函数看起来会非常臃肿,而且阅读性很差,
所以关键字还是isbn的判断完全可以提取出来

helper.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def is_isbn_or_key(word):
"""
旧版: 2-02-033598-0 十个数字和三个-
新版: 9787544253994 十三个数字
:param word:
:return:
"""
isbn_or_key = 'key'
if len(word) == 13 and word.isdigit():
isbn_or_key = 'isbn'
short_word = word.replace('-', '')
if '-' in word and len(word) == 10 and short_word.isdigit:
isbn_or_key = 'isbn'
return isbn_or_key

fisher.py

1
2
3
4
5
6
7
8
@app.route("/search/<q>/<page>")
def search(q, page):
"""
搜索书籍路由
:param q: 关键字 OR isbn
:param page: 页码
"""
isbn_or_key = is_isbn_or_key(q)

视图函数作为web项目的入口,不应该有太多的细节,阅读性会变差

requests获取书籍信息

httper.py

1
2
3
4
5
6
7
8
9
import requests


class HTTP:
def get(self, url, return_json=True):
r = requests.get(url=url)
if r.status_code != 200:
return {} if return_json else ''
return r.json() if return_json else r.text

url的返回值可以通过浏览器请求进行查看

封装一个YuShuBook来进行确定发送哪个请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class YuShuBook:

search_by_isbn_url = "http://t.yushu.im/v2/book/search/isbn/{}"

search_by_key_url = "http://t.yushu.im/v2/book/search?q={}&count={}&start={}"

@classmethod
def search_by_isbn(cls, isbn):
url = cls.search_by_isbn_url.format(isbn)
return HTTP.get(url)

@classmethod
def search_by_key(cls, q, count=15, start=0):
url = cls.search_by_key_url.format(q, count, start)
return HTTP.get(url)

在视图函数中将结果进行返回,并声明返回的类型application/json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@app.route("/book/search/<q>/<page>")
def search(q, page):
"""
搜索书籍路由
:param q: 关键字 OR isbn
:param page: 页码
"""
isbn_or_key = is_isbn_or_key(q)
if isbn_or_key == 'isbn':
result = YuShuBook.search_by_isbn(q)
else:
result = YuShuBook.search_by_key(q)

return json.dumps(result), 200, {'content-type': 'application/json'}

关于响应: return (response, status, headers)
可以使用flask提供的jsonify替换响应

1
return jsonify(result)

视图函数拆分

拆分之后可以再一个模块中专注于这个模块的业务逻辑,也防止了代码太长,难以维护的缺点
入口文件的意义比较独特,会启动web服务器以及做很多初始化的操作,不应该和业务逻辑放在一块

fisher.py

1
2
3
4
5
6
7
8
9
10
11
from flask import Flask

# 为了可以注册book.py中的路由
from app.web import book

app = Flask(__name__)

app.config.from_object("config")

if __name__ == "__main__":
app.run(host=app.config["HOST"], debug=app.config["DEBUG"], port=app.config["PORT"])

book.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from flask import jsonify

from helper import is_isbn_or_key
from yushu_book import YuShuBook

# 为了让book.py模块可以使用app对象
from fisher import app


@app.route("/book/search/<q>/<page>")
def search(q, page):
"""
搜索书籍路由
:param q: 关键字 OR isbn
:param page: 页码
"""
isbn_or_key = is_isbn_or_key(q)
if isbn_or_key == 'isbn':
result = YuShuBook.search_by_isbn(q)
else:
result = YuShuBook.search_by_key(q)
return jsonify(result)

这样拆分后,在浏览器中进行访问,会显示404
这里需要了解一下flask的路由机制
flask内部维护了一个关于视图函数的字典,每一个url对应一个视图函数
每一个url还对应一个endpoint,用于反向构建url(url_for)
flask路由注册源码

1
2
def add_url_rule(self, rule, endpoint=None, view_func=None,
provide_automatic_options=None, **options):

参数中有一个endpointview_func
部分源码

1
2
3
4
if endpoint is None:
endpoint = _endpoint_from_view_func(view_func)
options['endpoint'] = endpoint
methods = options.pop('methods', None)

1
2
3
4
5
6
7
8
9
10
11
12
13
# Add the required methods now.
methods |= required_methods

rule = self.url_rule_class(rule, methods=methods, **options)
rule.provide_automatic_options = provide_automatic_options

self.url_map.add(rule)
if view_func is not None:
old_func = self.view_functions.get(endpoint)
if old_func is not None and old_func != view_func:
raise AssertionError('View function mapping is overwriting an '
'existing endpoint function: %s' % endpoint)
self.view_functions[endpoint] = view_func

Flask内部由url_map维护一个url->endpoint的指向,
view_functions记录endpoint所指向视图函数的函数,
这样请求进入到Flask内部,才能通过Url找到对应的视图函数

循环引入

循环引入分析
图中有两种颜色的线:红色的线是fisher主执行文件被执行之后的执行路径;蓝色的线是book块被导入之后循环导入的执行路径.
1.主流程开始之后,首先到达导入book的语句.然后进入book模块中执行
2.book模块开始之后,首先到达导入fisher的语句(循环导入),这个时候主流程暂时结束,重新执行fisher中的代码
3.这时候又回到fisher中的导入book的语句,由于book已经被导入一次,所以不会再次导入,进入if语句,这个时候的name是book导入fisher时候的name:fisher,不是主流程main,所以if语句条件为false.蓝色线执行终止,重新回到2. book导入fisher的语句.
4.继续向下执行book 中app.route注册路由的语句.然后book执行完,回到fisher主流程执行中.
5.到达if语句,这个时候name为main.执行run方法,启动服务
回答流程图中的两个问题:
问题1:因为都是由fisher引入book,一个模块只会引入另一个模块一次.所以只执行了一次book
问题2:由于一次是主流程执行fisher文件;一次是由book模块导入 fisher.

找不到视图函数的最终解释和证明

整个流程中,出现了两次核心app对象的初始化,注册路由是在蓝色流程中初始化的app注册的.但是启动服务是红色流程中的app启动的
book中注册路由所使用的app对象,是他自己所导入fisher模块的app对象(蓝色流程中),而不是红色主流程中所实例化的app对象

book.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
print("id为"+str(id(app))+"的app注册路由")


@app.route("/book/search/<q>/<page>")
def search(q, page):
"""
搜索书籍路由
:param q: 关键字 OR isbn
:param page: 页码
"""
isbn_or_key = is_isbn_or_key(q)
if isbn_or_key == 'isbn':
result = YuShuBook.search_by_isbn(q)
else:
result = YuShuBook.search_by_key(q)
return jsonify(result)

fisher.py

1
2
3
4
5
6
7
8
9
10
11
app = Flask(__name__)
print("id为"+str(id(app))+"的app实例化")

app.config.from_object("config")

# 为了可以注册book.py中的路由
from app.web import book

if __name__ == "__main__":
print("id为" + str(id(app)) + "的app启动")
app.run(host=app.config["HOST"], debug=app.config["DEBUG"], port=app.config["PORT"])

执行结果

1
2
3
4
id为2152933096808的app实例化
id为2152950733960的app实例化
id为2152950733960的app注册路由
id为2152933096808的app启动

可以看到注册路由的app,和启动服务的app不是同一个app.并且最后启动的app是最先实例化的app,也就是红色主流程的app;而注册路由的app是后实例化的app,也就是由book导入fisher模块的蓝色流程的app

应用、蓝图与视图函数

Flask的层级关系

  • Flask最上层是app核心对象
  • 在这个核心对象上可以插入很多蓝图,这个蓝图是不能单独存在的,必须将app作为插板插入app
    在每一个蓝图上,可以注册很多静态文件,视图函数,模板
  • 一个业务模块可以做为一个蓝图,比如book,user.可以把视图函数注册到蓝图上再插入app.以此来达到之前分文件的目的
  • 之前的book.py 放到了app/web/路径下,就是考虑到了蓝图.app属于是整个Flask应用层.web属于是蓝图

代码规范化

应该讲一些初始化工作,放在对应层级的包的初始化文件 init.py 中.比如Flask核心应用app对象初始化应该放在应用层级app包的 init.py 中.蓝图的初始化,应该放在对应蓝图层级web包的init.py中,并且所有蓝图对应的试图函数都应该放在web目录下

app/init.py

1
2
3
4
5
6
7
from flask import Flask

def create_app():
app = Flask(__name__)

app.config.from_object("config")
return app

fisher.py

1
2
3
4
5
6
7
from app import create_app

app = create_app()


if __name__ == '__main__':
app.run()

蓝图

app/web/book.py

1
2
3
4
5
web = Blueprint('web', __name__)

@web.route("/book/search/<q>/<page>")
def search(q, page):
...

将蓝图注册到flask核心app中

1
2
3
4
5
6
7
8
9
def create_app():
app = Flask(__name__)
app.config.from_object("config")
register_blueprint(app)
return app

def register_blueprint(app):
from app.web.book import web
app.register_blueprint(web)

拆分蓝图

蓝图的出发点是分模块,类似于django中的include函数,将视图函数进行分类处理

web/init.py

1
2
3
4
5
from flask import Blueprint

web = Blueprint('web', __name__)

from app.web import book

book.py

1
2
3
4
5
6
7
8
9
10
11
from flask import jsonify, Blueprint

from . import web
from helper import is_isbn_or_key
from yushu_book import YuShuBook



@web.route("/book/search/<q>/<page>")
def search(q, page):
...

Request对象

https://dormousehole.readthedocs.io/en/latest/api.html#incoming-request-data
书籍的搜索在前端代码中是以表单的形式提交的
如果想要获取表单中的数据,就要用到这个request了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from flask import jsonify, Blueprint, request

@web.route("/book/search")
def search():
"""
搜索书籍路由
:param q: 关键字 OR isbn
:param page: 页码
"""
q = request.args.get('q', '')
page = request.args.get('page', '')
isbn_or_key = is_isbn_or_key(q)
if isbn_or_key == 'isbn':
result = YuShuBook.search_by_isbn(q)
else:
result = YuShuBook.search_by_key(q)
return jsonify(result)

WTForms参数验证

WTForms 是一款优秀的参数验证框架.可以将参数验证抽离出一个模块.与业务代码解耦.

1
pip install WTForms

使用WTForms需要自定义一个类继承wtforms提供的Form类,然后定义参数校验规则
app/forms/book.py

1
2
3
4
5
6
from wtforms import Form, StringField, IntegerField
from wtforms.validators import DataRequired,Length,NumberRange

class SearchForm(Form):
q = StringField(validators=[DataRequired(), Length(min=1, max=30, message="关键字长度必须在1~30之间")])
page = IntegerField(validators=[NumberRange(min=1, max=99)], default=1)

book.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@web.route("/book/search")
def search():
"""
搜索书籍路由
:param q: 关键字 OR isbn
:param page: 页码
"""
form = SearchForm(request.args)
if form.validate():
q = form.q.data.strip()
page = form.page.data
isbn_or_key = is_isbn_or_key(q)
if isbn_or_key == 'isbn':
result = YuShuBook.search_by_isbn(q)
else:
result = YuShuBook.search_by_key(q)
return jsonify(result)
else:
return jsonify(form.errors)

自定义form表单中的参数要和前端代码中的参数一致,这样才能使用request.args进行构造

配置文件拆分

某些数据比较隐私,不适合公开,需要在创建一个配置文件来保存这些数据

在之前的代码中使用了count,start两个参数来控制返回的数据,不应该直接写死的,而且需要接受的参数应该是page,考虑到代码的封装性,应该尽可能的隐藏细节,应该把计算count,start的过程放到YuShuBook的search_by_key方法中来做

虽然计算start的方法很简单.但是这是一个单独的逻辑过程,不应该将这段过程放在访问api获取数据的方法中.而应该封装成一个方法,以方法名来代替这段逻辑

yushu_book.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class YuShuBook:

search_by_isbn_url = "http://t.yushu.im/v2/book/search/isbn/{}"

search_by_key_url = "http://t.yushu.im/v2/book/search?q={}&count={}&start={}"

@classmethod
def search_by_isbn(cls, isbn):
url = cls.search_by_isbn_url.format(isbn)
return HTTP.get(url)

@classmethod
def search_by_key(cls, q, page=1):
url = cls.search_by_key_url.format(q, current_app.config['PRE_PAGE'], cls.calculate_start(page))
return HTTP.get(url)

@staticmethod
def calculate_start(page):
return (page -1)*current_app.config['PRE_PAGE']

config.py

1
PRE_PAGE = 15

app/init.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from flask import Flask

import config, secure


def create_app():
app = Flask(__name__)

app.config.from_object(config)
app.config.from_object(secure)
register_blueprint(app)

return app

def register_blueprint(app):
from app.web import web
app.register_blueprint(web)

目录重构

数据表创建方式

database first

是最普标的.直接在数据库中编写DML语句,建表.

model first

使用建模工具,根据绘制的数据模型,生成数据表.

code first

在代码中创建业务模型(实体类),自动反向生成数据表.
可以专注业务模型的设计,而不是数据库的设计
不需要关心数据库表以及数据库表是如何创建的,简化思维逻辑
数据库只是用来存储数据的,表之间的关系应该有业务来决定

ORM与Code first的区别

Code first关注的是相关的数据表是怎么创建的,他解决的是创建数据的问题
ORM(Object relation Map)不仅仅是解决数据创建的问题,还包含了数据的查询,更新,添加,删除.ORM希望我们通过操作一个个模型来间接操作数据库,所以说他的范围是更加广阔的.我们后面的所有的数据库操作都是通过ORM来操作的

定义模型类

新建一个模块model,用于存储数据库表对应的业务模型,在编写model层的模型时,一定要忘记数据库表,重点要放在业务模型的抽象中来

sqlalchemy 是一个类库,用于根据定义的model反向生成数据库表
Flask_SqlAlchemy是Flask在sqlalchemy基础上封装的一个组件.提供了更加人性化的API来操作数据库
安装: pip install flask-sqlalchemy

app/models/book.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from sqlalchemy import Column, String, Integer
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()


class Book(db.Model):
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(50), nullable=True)
author = Column(String(30), default="佚名")
binding = Column(String(20))
publisher = Column(String(50))
price = Column(String(20))
pages = Column(Integer)
isbn = Column(String(15), nullable=True, unique=True)
summary = Column(String(1000))
image = Column(String(50))

app/init.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from flask import Flask

from app import config, secure
from app.models.book import db


def create_app():
app = Flask(__name__)

app.config.from_object(config)
app.config.from_object(secure)
register_blueprint(app)
db.init_app(app=app)
db.create_all(app=app)
return app

使用create_all来创建表,需要传入app参数,不传入会报错
这个函数不是通过浏览器访问请求才执行的,
flask不会自动的将app加入栈中,需要我们自己将其添加入栈

数据库连接配置

这里使用cymysql作为连接驱动
pip install cymysql

app/secure.py

1
2
# URI规则:数据库类型+驱动://账号:密码@host:port/dbname
SQLALCHEMY_DATABASE_URI = 'mysql+cymysql://root:123456@localhost:3306/fisher'

flask核心机制

flask中经典错误 working outside application context

在前面数据库的创建中,我们使用db.create_all(app=app)来避免错误,下面来研究一下这个错误的原因

写一段测试代码

1
2
3
4
5
6
7
8
9
from flask import Flask, current_app

app = Flask(__name__)

# 断点调试这里显示current_app=[LocalProxy]<LocalProxy unbound>
a = current_app

# RuntimeError: Working outside of application context.
b = current_app.config["DEBUG"]

通过current_app获取配置文件中的内容,代码看似没有问题,却抛出了错误
断点调试时发现current_app并不是一个Flask对象,而是一个unbound的LocalProxy

之前的requestcurrent_app都是LocalProxy

1
2
3
4
5
6
7
# context locals
_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
current_app = LocalProxy(_find_app)
request = LocalProxy(partial(_lookup_req_object, 'request'))
session = LocalProxy(partial(_lookup_req_object, 'session'))
g = LocalProxy(partial(_lookup_app_object, 'g'))

AppContext、RequestContext、Flask与Request之间的关系

AppContext、RequestContext

Flask有两个上下文,应用上下文-AppContext和请求上下文-RequestContext.他们本质都是对象,是一种封装.应用上下文是对Flask的封装,请求上下文是对Request的封装

下面我们来通过源码,了解一下这两个上下文.
在源码中找到这两个对象

阅读AppContext和RequestContext的构造函数,发现他们都将核心对象app作为了他们的一个属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# AppContext
def __init__(self, app):
self.app = app
self.url_adapter = app.create_url_adapter(None)
self.g = app.app_ctx_globals_class()

# Like request context, app contexts can be pushed multiple times
# but there a basic "refcount" is enough to track them.
self._refcnt = 0


# RequestContext
def __init__(self, app, environ, request=None):
self.app = app
if request is None:
request = app.request_class(environ)
self.request = request
self.url_adapter = app.create_url_adapter(self.request)
self.flashes = None
self.session = None
...

并且都有四个相同的方法

1
2
3
4
5
6
7
8
def push(self):
...
def pop(self, exc=_sentinel):
...
def __enter__(self):
...
def __exit__(self, exc_type, exc_value, tb):
...

为什么需要上下文

为什么需要上下文,直接操作Flask的核心对象app不可以吗?

这是一个设计思想.有时候呢,我们不光需要这个核心对象app,还需要他外部的一些东西,这个时候,我们可以他们统一结合封装到一起,组装成一个新的上下文对象,并且在这个对象之上,可以提供一些新的方法,如我们上面所提到的push、pop等

对AppContext、RequestContext、Flask与Request的意义做出一个解释

  • Flask:核心对象,核心对象里承载了各种各样的功能,比如保存配置信息,再比如注册路由试图函数等
  • AppContext:对Flask的封装,并且增加了一些额外的参数
  • Request:保存了请求信息,比如url的参数,url的全路径等信息
  • RequestContext:对Request的封装

我们在实际编码过程中,可能是需要Flask或者Request的信息的,但是这并不代表我们应该直接导入这两个对象获取相关信息,正确的做法是从AppContext,RequestContext中间接的获得我们需要的信息

即使这样,我们也没有必要导入Context去使用上下文,这就回到了current_app和request这些LocalProxy,他们提供了间接操作上下文对象的能力,使用了代理模式

详解flask上下文与出入栈

  1. 当一个请求进入Flask框架,首先会实例化一个RequestContext,这个上下文封装了请求的信息在Request中,并将这个上下文推入到栈_request_ctx_stack中,即之前将的push方法
  2. RequestContext在入_request_ctx_stack之前,首先会检查_app_ctx_strack是否为空,如果为空,则会把一个AppContext的对象入栈,然后再将这个请求入栈到_request_ctx_stack
  3. 我们的current_apprequest对象都是永远指向_app_ctx_strack_request_ctx_stack的栈顶元素,也就是分别指向了两个上下文,如果这两个值是空的,那么LocalProxy就会出现unbound的状态
  4. 当请求结束的时候,这个请求会出栈-pop

回到之前的代码,如果想要测试代码正常运行,就需要手动将一个AppContext入栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from flask import Flask, current_app

app = Flask(__name__)

# 获取AppContext,里面的代码很简单,就是:return AppContext(self)
ctx = app.app_context()
# 将AppContext入栈
ctx.push()
# 断点调试这里显示current_app=[LocalProxy]<LocalProxy unbound>
a = current_app

# RuntimeError: Working outside of application context.
b = current_app.config["DEBUG"]
print(b)

虽然current_app和request指向的是两个上下文,但是他们返回的却是Flask核心对象和Request对象.下面来看下这部分的源码
globals.py

1
2
3
4
5
def _find_app():
top = _app_ctx_stack.top
if top is None:
raise RuntimeError(_app_ctx_err_msg)
return top.app

可以看到,获取的依然是app核心对象

flask上下文与with语句

在前面通过手动将app推入栈,弹出栈的方式,解决了working outside application context的问题.实际上更经典的做法是使用with语句来完成.

首先使用with语句替换之前的代码

1
2
3
4
5
app = Flask(__name__)

with app.app_context():
a = current_app
b = current_app.config["DEBUG"]

什么时候可以使用with语句:

  • 实现了上下文协议的对象,可以使用with语句
  • 对于实现了上下文协议的对象,我们通常称为上下文管理员
  • 通过实现enterexit来实现上下文协议
  • 上下文表达式必须返回一个上下文管理器

对于上面一段代码来说,AppContext就是上下文管理器;app.app_context()就是上下文表达式.__enter__中做了push操作,__exit__中做了pop操作.
所以只要进入with语句,current_app就是有值的,一旦离开了with语句,current_app
就会弹出,然后就又没有值了(又变成了unbound).

1
2
3
4
5
6
7
8
9
def __enter__(self):
self.push()
return self

def __exit__(self, exc_type, exc_value, tb):
self.pop(exc_value)

if BROKEN_PYPY_CTXMGR_EXIT and exc_type is not None:
reraise(exc_type, exc_value, tb)

通过数据库的链接和释放来理解with语句的具体含义

连接数据库的操作步骤:

  1. 连接数据库
  2. sql或者其他的业务逻辑
  3. 释放资源
    如果上面的第二部分出错,那么第三部分的释放资源就不会被执行,资源就会一直被占用.
    解决这个问题的通常做法是使用try-except-finally
    但是在python中更优雅的方式就是使用with语句中.我们可以把连接数据库的操作写在上下文管理器的__enter__方法里面,把业务代码写在with语句的代码块里面,把释放资源的语句写在__exit__里面.

读写文件的具体例子

一般写法

1
2
3
4
5
try:
f = open(r'/Users/test.txt')
print(f.read())
finally:
f.close()

使用with语句写法

1
2
with open('test.txt') as f:
print(f.read())

with返回的并不是上下文管理器,而是__enter__方法的返回值

上面一段代码我们在__enter__中返回了一个a,所以下面as后的obj_a就是1

exit方法接收三个参数,分别是异常类型,异常消息,和详细的异常堆栈信息
exit方法需要返回一个boolean类型的值,如果返回True,那么外部就不会抛出异常,如果返回False,那么还会在外部抛出异常,如果什么都不返回,按照False处理.

阅读源码解决db.create_all的问题

在前面flask_sqlalchemy已经注册了app对象,但是create_all方法还是需要传入app参数,不传就会报错
这里先看一下init_app源码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def init_app(self, app):
"""This callback can be used to initialize an application for the
use with this database setup. Never use a database in the context
of an application not initialized that way or connections will
leak.
"""
if (
'SQLALCHEMY_DATABASE_URI' not in app.config and
'SQLALCHEMY_BINDS' not in app.config
):
warnings.warn(
'Neither SQLALCHEMY_DATABASE_URI nor SQLALCHEMY_BINDS is set. '
'Defaulting SQLALCHEMY_DATABASE_URI to "sqlite:///:memory:".'
)

app.config.setdefault('SQLALCHEMY_DATABASE_URI', 'sqlite:///:memory:')
app.config.setdefault('SQLALCHEMY_BINDS', None)
app.config.setdefault('SQLALCHEMY_NATIVE_UNICODE', None)
app.config.setdefault('SQLALCHEMY_ECHO', False)
app.config.setdefault('SQLALCHEMY_RECORD_QUERIES', None)
app.config.setdefault('SQLALCHEMY_POOL_SIZE', None)
app.config.setdefault('SQLALCHEMY_POOL_TIMEOUT', None)
app.config.setdefault('SQLALCHEMY_POOL_RECYCLE', None)
app.config.setdefault('SQLALCHEMY_MAX_OVERFLOW', None)
app.config.setdefault('SQLALCHEMY_COMMIT_ON_TEARDOWN', False)
track_modifications = app.config.setdefault(
'SQLALCHEMY_TRACK_MODIFICATIONS', None
)

if track_modifications is None:
warnings.warn(FSADeprecationWarning(
'SQLALCHEMY_TRACK_MODIFICATIONS adds significant overhead and '
'will be disabled by default in the future. Set it to True '
'or False to suppress this warning.'
))

app.extensions['sqlalchemy'] = _SQLAlchemyState(self)

@app.teardown_appcontext
def shutdown_session(response_or_exc):
if app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']:
if response_or_exc is None:
self.session.commit()

self.session.remove()
return response_or_exc

create_app源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def _execute_for_all_tables(self, app, bind, operation, skip_tables=False):
app = self.get_app(app)

if bind == '__all__':
binds = [None] + list(app.config.get('SQLALCHEMY_BINDS') or ())
elif isinstance(bind, string_types) or bind is None:
binds = [bind]
else:
binds = bind

for bind in binds:
extra = {}
if not skip_tables:
tables = self.get_tables_for_bind(bind)
extra['tables'] = tables
op = getattr(self.Model.metadata, operation)
op(bind=self.get_engine(app, bind), **extra)

def create_all(self, bind='__all__', app=None):
"""Creates all tables.

.. versionchanged:: 0.12
Parameters were added
"""
self._execute_for_all_tables(app, bind, 'create_all')

可以看到create_all方法调用了_execute_for_all_tables私有方法,_execute_for_all_tables里面第一行的get_app方法用来获取一个app核心对象

get_app源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def get_app(self, reference_app=None):
"""Helper method that implements the logic to look up an
application."""

if reference_app is not None:
return reference_app

if current_app:
return current_app._get_current_object()

if self.app is not None:
return self.app

raise RuntimeError(
'No application found. Either work inside a view function or push'
' an application context. See'
' http://flask-sqlalchemy.pocoo.org/contexts/.'
)

在这个方法中,通过三种方式来判断是否存在核心app,所以解决的方法也有三种

  1. create_all中传入关键字参数app
  2. 向堆栈中推入一条app_context,使得current_app不为空
  3. 在初始化flask_sqlalchemy对象的时候,传入app参数

具体选取哪种方式,是根据情况而定的,比如我们当前的情况,就不合适使用第三种方法,因为我们的flask_sqlalchemy对象是在models中的book.py中的,如果用第三种方式,还需要在这里导入app对象.
db = SQLAlchemy(app)

Flask中的多线程与线程隔离技术

进程(竞争计算机资源的基本单位)

  • 操作系统用来调度、分配资源的单位,每个应用程序至少有一个进程
  • CPU在某一时间节点只能够执行一个应用程序,CPU可以在不同的应用程序进程之间切换,
  • 进程/线程,切换时开销是非常大的,因为上下文是需要保存和加载消耗大

线程(是进程的一部分,可以只有一个进程 多个进程)

  • 线程之间切换所消耗的资源更小,线程非常轻量,本身不负责管理资源也不用有资源,所以线程是去使用进程的相关资源的,所以让线程切换起来更加快速
  • 进程 分配资源
  • 线程 利用CPU执行代码
     代码 指令 CPU来执行 资源,线程没有资源,有指令
  • 线程 自己不拥有资源,但是可以访问进程的资源

多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import threading


def worker():
print('i am thread')
t = threading.current_thread()
print(t.getName())


t = threading.current_thread()
print(t.getName())

new_t = threading.Thread(target=worker, name='qiyue_thread')
new_t.start()

运行结果

1
2
3
MainThread
i am thread
_thread

修改以上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time
import threading


def worker():
print('i am thread')
t = threading.current_thread()
time.sleep(5)
print(t.getName())


new_t = threading.Thread(target=worker, name='_thread')
new_t.start()


t = threading.current_thread()
print(t.getName())

结果没有任何改变,只是_thread线程会休眠5秒后再打印结果

多线程编程好处

  • 更加充分地利用CPU的性能优势,从而加快代码的执行速度
  • 异步编程
  • 单核CPU,同一时间只允许一个线程来使用CPU执行代码
  • 多核CPU,完全有能力让不同的核去处理不同的线程,并行执行程序
  • python不能充分利用多核CPU的优势

全局解释器锁GIL(global interpreter lock)

锁:线程安全,多个线程共享一个进程的资源,可能多个线程同时访问一个资源,造成 线程不安全一般,为了保证线程安全,采用锁的机制.一旦对某个资源进行了加锁操作,只有拿到锁的线程才能对资源进行操作,执行完被释放之后才能被别的线程使用

  • 细粒度锁 程序员 主动加锁
  • 粗粒度锁 解释器 GIL
  • 虽然多核CPU可以跑多个线程,但是python是需要解释器来解释的,由于GIL的存在,在python解释器上面,同一时刻只允许一个线程来执行
  • 总结:CPU硬件是没有限制的,但是由于解释器,GIL只允许一个线程同时执行,一定程度上保证线程安全.可以采用多进程,可是多进程之间是不能互相访问的,如果想在进程之间共享资源,需要用到进程通信技术,相当麻烦,切换成本高

对于IO密集型程序,多线程有意义

  • CPU密集型程序:非常严重的依赖CPU计算(圆周率计算、视频解码)
  • IO密集型程序:查询数据库、请求网络资源、读写文件,按照时间段消耗在那种操作上面来划分的
  • IO密集型主要花费在等待上面,不如让别的线程来使用CPU

开启flask多线程

  • 通过webserver开启多线程app.run(host='0.0.0.0', debug=app.config['DEBUG'], port=5000, threaded=True)
  • 单进程多线程
    • 对象是保存状态的地方.实例化三个不同的request对象分别用来保存三个请求的状态
      字典,用不同线程id号来线程隔离
    • 多线程,每个线程都有唯一标识,唯一标识为key,每个线程所实例化的Request对象作为value值
    • 一个request对象,一个request对象指向了一个字典的数据结构,字典包含了不同线程所创建的不同的Request实例化对象
  • 线程隔离对象
    • flask内部引用了werkzeug库,库内部有local模块,local模块有Local对象,线程隔离由Local对象完成,通过字典的方式
    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      import time
      import threading

      from werkzeug.local import Local


      my_obj = Local()
      my_obj.b = 1


      def worker():
      my_obj.b = 2
      print('in new thread b is:' + str(my_obj.b))


      new_t = threading.Thread(target=worker, name='_thread')
      new_t.start()
      time.sleep(1)

      # 主线程
      print('in main thread b is:' + str(my_obj.b))

线程隔离的栈

  • 两个上下文,一个请求上下文,一个应用上下文,会在请求进来的时候被推进到栈中,_app_ctx_stack_request_ctx_stack这两个变量名所指向的对象都是LocalStack()这样的类型,就是可以用来做线程隔离的栈
  • LocalStack Local 字典
    • Local使用字典的方式实现的线程隔离
    • LocalStack封装了线程对象,把Local对象作为它自己的一个属性,从而实现的一个线程隔离的栈结构
  • LocalStack基本用法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    >>> ls = LocalStack()
    >>> ls.push(42)
    >>> ls.top
    42
    >>> ls.push(23)
    >>> ls.top
    23
    >>> ls.pop()
    23
    >>> ls.top
    42

LocalStack作为线程隔离对象的意义

简单说就是在数据结构方面限制了某些能力

  • flask为什么需要栈结构
    • 需要栈结构,将两个上下文推到栈中去
  • 为什么需要LocalStack
    • 需要栈是线程隔离的
  • 线程隔离意义
    • 使当前线程能够正确引用到他自己所创建的对象,而不是引用到其他线程所创建的对象
    • 用一个变量名(request)同时指向多个线程所创建的多个实例化对象是不可能的
    • 可以做到,在当前线程,在引用request(变量名)可以正确找到当前线程它自己所实例化的Request对象
  • 请求上下文包含Request对象,所以,Request也是被线程隔离的session也是线程隔离

知识梳理

线程隔离对象
– LocalStack和local是线程隔离对象
被线程隔离的对象
– 通过线程隔离对象来创建被线程隔离的对象
flask的核心对象app作为一个属性存在于AppContext的应用上下文下
– 核心对象app只有一个,是在入口文件创建,在主线程中被创建
多线程编程难点
– 线程安全
– 线程隔离

总结

  • Local->LocalStack,线程隔离对象实现
    • Local内部有一个字典,以线程ID号作为key
    • LocalStack如何实现?LocalStack封装了Local
    • 操作Local,通常使用.去访问下面的属性;使用LocalStack,需要使用那几个常用的方法和属性,push、pop、top
  • AppContext->RequestContext
    • 请求进来,会被推入到LocalStack的栈中去,同时在请求结束时,AppContextRequestContext会被pop弹出去
  • Flask->AppContext Request->RequestContext
    • AppContext重要特点,将Flask核心对象作为它的一个属性,保存了起来
    • RequestContext请求上下文,将请求对象Request封装和保存
  • current_app->(LocalStack.top=Appcontext top.app=Flask)
    • current_app指向的是LocalStack下面的栈顶元素的一个属性,也就是top.app,Flask的核心对象,栈顶元素为应用上下文
  • request->(LocalStack.top=RequestContext top.request=Request)
    • request实际指向的是LocalStack栈顶元素下面的Request请求对象

书籍详情页面的构建

ViewModel的基本概念


大多时候,我们从数据库,或者外部网络获取到的原始数据,并不能满足复杂的业务需求.业务的直观体现就是页面.

  • 可能有的页面不需要全部的字段
  • 可能有的页面需要改变一些数据,如给书名加上《》
  • 可能有的页面需要多种源数据组合到一起.

为了满足各种各样复杂的业务需求,我们抽离出一个ViewModel层,为每一个页面定制一个专属ViewModel来适配.综上所述,ViewModel有三个方面的作用

  • 裁剪
  • 修饰
  • 合并

这三个作用并不一定在每个ViewModel上只出现一次,可能会组合使用.

使用ViewModel处理书籍数据

http://yushu.im
通过书籍搜索页面的观察,我们可以分析出,这个页面需要返回的有三部分数据
1.书籍详情信息
2.搜素关键词
3.总记录数

而我们现在的原始数据里
1.数据详情数据载ISBN搜索和关键词搜索返回的数据格式不统一
2.原始数据里并没有搜索关键词,需要手动添加
3.ISBN搜索只返回0或1条数据,需要手动计算出总记录数

这些都可以在ViewModel层做统一处理.
view_models/book.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class BookViewModel:
@classmethod
def package_single(cls, data, keyword):
returned = {
'book': [],
'keyword': keyword,
'total': 0
}
if data:
returned['total'] = 1
returned['book'] = [cls.__cut_book_data(data)]

return returned

@classmethod
def package_collection(cls, data, keyword):
returned = {
'book': [],
'keyword': keyword,
'total': 0
}
if data:
returned['total'] = data['total'],
returned['book'] = [cls.__cut_book_data(book) for book in data["books"]]
return returned

@classmethod
def __cut_book_data(cls, data):
book = {
'title': data['title'],
'publisher': data['publisher'],
'pages': data['pages'] or '',
'author': '、'.join(data['author']),
'price': data['price'],
'summary': data['summary'] or '',
'image': data['image']
}
return book

伪面向对象:披着面向对象外衣的面向过程
仔细分析我们的BookViewModel,YuShuBook两个类.可以发现,实际上这两个类的编写根本就不是遵循面向对象设计方法.

面向对象设计方法要求一个类或对象,应该可以描述特征和行为.描述特征的即类变量,实例变量;描述行为的就是方法.

但是我们的BookViewModel只有方法,没有特征.虽然我们给他们包上了class:这层华丽的外衣,但是他的本质依旧是面向过程的(以函数为核心).

如何判断一个类是否遵循了面向对象原则.
看是否有大部分方法可以被编辑为类方法或者静态方法

重构鱼书核心对象

重构思路:
首先来看YuShuBook.之所以YuShuBook会出现大量的classmathod,原因就在于:YuShuBook并不会保存数据,而是把所有数据都返回给了调用方去;换句话说YuShuBook是个伪面向对象(伪类),因为他只是包装了一系列的方法,而没有存储这个类的特征,数据.

既然一个类或者对象应该有自己的特征和数据,那么我们就没有必要通过方法的形式把本应该用来描述这个列的特征的数据返回回去,而是保存在YuShuBook这个类中.

YuShuBook关键词缀在Book中,依此可以知道,我们这个类是用来描述书籍的,而search_by_isbn,search_by_key这些方法返回的就是关于书籍的数据,所以我们完全可以用这个方法所返回的数据来描述类本身,把他存储到类的内部,而不是返回到外部去(保持类的内聚性)

重构后的代码
app/spider/yushu_book.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from flask import current_app

from app.libs.httper import HTTP


class YuShuBook:
search_by_isbn_url = "http://t.yushu.im/v2/book/isbn/{}"

search_by_key_url = "http://t.yushu.im/v2/book/search?q={}&count={}&start={}"

def __init__(self):
self.total = 0
self.books = []

def search_by_isbn(self, isbn):
url = self.search_by_isbn_url.format(isbn)
result = HTTP.get(url)
self.__fill_single(result)

def search_by_key(self, q, page=1):
url = self.search_by_key_url.format(q, current_app.config["PRE_PAGE"],
self.calculate_start(page))
result = HTTP.get(url)
self.__fill_collection(result)

def __fill_single(self, data):
if data:
self.books = [data]
self.total = 1

def __fill_collection(self, data):
self.books = data['books']
self.total = data['total']

def calculate_start(self, page):
return (page - 1) * current_app.config["PRE_PAGE"]

app/view_models/book.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 具体的单一数据处理封装在BookViewModel里
class BookViewModel:
def __init__(self, book):
self.title = book['title']
self.publisher = book['publisher']
self.pages = book['pages']
self.author = book['author']
self.price = book['price']
self.summary = book['summary']
self.image = book['image']


# BookCollection只负责集合的处理
class BookCollection:

def __init__(self):
self.keyword = ''
self.total = 0
self.books = []

def fill(self, yushu_book, keyword):
self.keyword = keyword
self.total = yushu_book.total
self.books = [BookViewModel(book) for book in yushu_book.books]

app/web/book.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@web.route("/book/search/")
def search():
"""
搜索书籍路由
:param q: 关键字 OR isbn
:param page: 页码
"""
form = SearchForm(request.args)
if not form.validate():
return jsonify(form.errors)

q = form.q.data.strip()
isbn_or_key = is_isbn_or_key(q)

books = BookCollection()
yushu_book = YuShuBook()

if isbn_or_key == 'isbn':
yushu_book.search_by_isbn(q)
else:
page = form.page.data
yushu_book.search_by_key(q, page)

books.fill(yushu_book, q)
return jsonify(books)

在重构的时候,我们没有在类中保存查询参数,如果保存了查询参数,在代码层面是没有问题的,但是存在一个问题就是:YuShuBook的作用是隐藏了数据的具体获取方式,也就是说YuShuBook足够的抽象,不需要我们去关心数据具体是怎么来的,但是如果我们把查询参数也加入到类里面来之后,他就会使得这个类更加的具体化,因为我们保持的数据更多,更加精准,甚至描述了使如何获取数据的,一般一个类太具体化就会存在一个扩展和修改的问题,假如YuShuBook有一天内部的数据来源改成从数据库里获取信息,那么这样的情况下面,我们把查询参数放在YuShuBook中反而不好了,也减少了YuShuBook的灵活性

从json序列化看代码解释权反转

重构完代码后,如果访问视图函数,是会报json转换异常的错误的,这是因为python不能直接将一个对象序列化成json字符串.下面我们来解决这个问题

虽然对象不能序列化,但是字典是可以的,而python有一个实例变量__dict__来获取一个对象所有属性组成的字典.

但是对于BookCollection这个类实例化的对象来说,不可以,因为他不是一个普通的python对象,他有一个属性是BookViewModel对象(books)

转移解释权

我们可以借助一种转移解释权的思想,我们提供一个函数,来序列化对象,当有遇到不可序列化的成员变量时,当我们不知道如何继续序列化的时候,我们可以把这个解释的过程交给函数的调用方,由函数调用方帮我们完成序列化的过程.

这种转移解释权的思想可以通过函数式编程的方式非常简单巧妙的实现:

我们在编写json.dumps()的时候,要求函数调用方传递一个函数,传递的这个函数的具体实现细节是由函数调用方来完成的,我们不需要关心,这个函数内部的具体实现细节,一旦遇到不能够序列化的类型的时候,我们就调用这个函数.将不可序列化的类型转化成可以序列化的类型.我们只需要关注return的结果.

这就实现了代码解释权的反转,代码的解释权不在由函数编写方来定义,而是把这个权利交给了函数调用方.也是设计模式中的策略模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@web.route("/book/search/")
def search():
"""
搜索书籍路由
:param q: 关键字 OR isbn
:param page: 页码
"""
form = SearchForm(request.args)
if not form.validate():
return jsonify(form.errors)

q = form.q.data.strip()
isbn_or_key = is_isbn_or_key(q)

books = BookCollection()
yushu_book = YuShuBook()

if isbn_or_key == 'isbn':
yushu_book.search_by_isbn(q)
else:
page = form.page.data
yushu_book.search_by_key(q, page)

books.fill(yushu_book, q)
return json.dumps(books, default=lambda o: o.__dict__)

单页应用与多页应用的区别

通过vue.js或者其它前端框架搭建的项目,通过webpack打包后,只有一个index页面,页面的内容完全是依靠js加载来渲染的,这样一来,当访问主页面时,浏览器会发送大量的请求,不管这些请求的内容是否是主页面需要的,如果网速差点的话,加载速度是非常慢的,体验非常差,但是主页面加载成功后,再访问这个网站的其它内容时,由于已经加载过了,所以速度非常快

多页面是请求哪个页面就返回哪个页面的内容,数据已经在服务器端渲染好了

静态文件、模板、消息闪现与Jinja2

静态文件访问原理

Flask访问静态文件非常简单,只需要在项目根目录建立static文件夹.将静态资源文件放入static下即可.访问的时候访问http://127.0.0.1:5000/static/fileName即可.
这里的根目录并不是项目的根目录fisher,而是app目录.这是因为,我们在实例化Flask核心对象的时候,传入了__name__参数,这个__name__指向的就是当前文件所在目录.static_folder指向的是当前目录下的static

1
2
3
4
5
6
7
8
9
10
11
12
13
def __init__(
self,
import_name,
static_url_path=None,
static_folder='static',
static_host=None,
host_matching=False,
subdomain_matching=False,
template_folder='templates',
instance_path=None,
instance_relative_config=False,
root_path=None
):

在初始化flask时,我们可以传递static_url_pathstatic_folder这两个参数
这两个参数决定了静态资源的访问路径

1
app = Flask(__name__, static_url_path='/xxx', static_folder='static/s/')

访问方式http://127.0.0.1:5000/xxx/img/1.jpg

flask初始化源码

1
2
3
4
5
6
7
8
9
10
11
12
13
# Add a static route using the provided static_url_path, static_host,
# and static_folder if there is a configured static_folder.
# Note we do this without checking if static_folder exists.
# For one, it might be created while the server is running (e.g. during
# development). Also, Google App Engine stores static files somewhere
if self.has_static_folder:
assert bool(static_host) == host_matching, 'Invalid static_host/host_matching combination'
self.add_url_rule(
self.static_url_path + '/<path:filename>',
endpoint='static',
host=static_host,
view_func=self.send_static_file
)

1
2
3
4
5
6
def _get_static_url_path(self):
if self._static_url_path is not None:
return self._static_url_path

if self.static_folder is not None:
return '/' + os.path.basename(self.static_folder)

通过源码可以看到在flask初始化时通过add_url_rule函数来注册静态资源的路由,
static_url_path函数调用了下面的_get_static_url_path方法来获取静态资源的访问路径
如果传递了static_url_path参数,就将其作为访问路径返回,否则就返回static_folderbasename

1
2
3
os.path.abspath  ->  G:\GitHub\fisher\test.py
os.path.dirname -> G:/GitHub/fisher
os.path.basename -> test.py

模板

在flask初始化时有一个template_folder参数,这个参数就是模板的默认位置
render_template方法的第一个参数时HTML的名称,这个参数必须位于template_folder指向的目录下面

蓝图初始化时也可以修改模板所在位置和静态文件所在位置
蓝图初始化源码

1
2
3
4
def __init__(self, name, import_name, static_folder=None,
static_url_path=None, template_folder=None,
url_prefix=None, subdomain=None, url_defaults=None,
root_path=None):

jinjia2的用法和DTL类似
http://docs.jinkan.org/docs/jinja2/

静态资源加载
图片路径:static\images\1.jpg

1
2
3
<img src="{{ url_for('static', filename='images/1.jpg') }}">
{{ url_for('static', filename='css/xxx.css') }}
{{ url_for('static', filename='js/xxx.js') }}

消息闪现、SecretyKey与变量作用域

Flask 提供了一个非常简单的方法来使用闪现系统向用户反馈信息.闪现系统使得在一个请求结束的时候记录一个信息,然后在且仅仅在下一个请求中访问这个数据.

在视图函数中配置闪现消息

1
2
flash("hello,wenfeng", category="success")
flash("hello,jiagndan", category="warning")

在html中使用闪现消息

1
2
{% set message = get_flashed_messages() %}
{{ message }}

使用set创建变量,这时的变量作用于在整个block

1
2
3
{% with message = get_flashed_messages() %}
{{ message }}
{% endwith %}

使用with关键字,可以随意控制作用域

用户登录与注册

viewmodel意义的体现与filter函数的巧妙应用

在搜索书籍页面里,需要将每一条结果的作者,出版社,价格在一行展示,并以”/“分割.由于这三个属性还有可能为空,所以在html模板里处理不太方便.我们选择将这些数据处理的工作放在viewmodel中.

简单粗暴一点的方法是写一段if-else代码,将这三个属性以及可能为空的情况全都穷举出来,但是python给我们提供了更优雅的解决方式,就是使用filter过滤器+lambda表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class BookViewModel:
def __init__(self, book):
self.title = book['title']
self.publisher = book['publisher']
self.pages = book['pages']
self.author = book['author']
self.price = book['price']
self.summary = book['summary']
self.image = book['image']

# @property装饰器可以让我们把一个方法当做一个属性来使用
@property
def intro(self):
intros = filter(lambda x: True if x else False,
[self.author, self.publisher, self.price])

return ' / '.join(intros)

书籍详情页面

业务逻辑梳理

  • 书籍详情页面,首先应该展示数据详情信息.
  • 书籍详情页面应该有加入心愿清单和赠送此书的功能
  • 书籍详情页面默认展示想要赠书次数的人,并且可以向他们索要书籍
  • 如果用户点击了赠送此书,那么他就成了一个赠书人,这个时候书籍详情页面会展示出想要这本书的人

编写思路

  • 书籍详情页面接受一个isbn作为参数,直接访问我们之前编写的yushu_booksearch_by_isbn函数即可.这需要我们在之前的BookViewModel中加入isbn属性
  • search_by_isbn返回的原始数据不应该直接返回,而应该经过裁剪加工,这里也可以复用我们之前写的BookViewModel.
  • BookViewModel需要接受一个book对象,由于search_by_isbn只会返回只有一个对象的列表,所以我们返回结果的第一个元素即可
  • 但是yushu_book.books[0]的写法并不是很好的编程规范,我们之所以可以这么写是因为我们清楚的内部结构,但是我们写的代码不一定是给我们自己用,给被人用的时候要让被人清晰易懂,所以这里,我们在yushu_book加入一个first函数返回第一个对象.

app/spider/yushu_book.py

1
2
3
@property
def first(self):
return self.books[0] if self.total >= 1 else None

app/web/book.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@web.route("/book/search/")
def search():
"""
搜索书籍路由
:param q: 关键字 OR isbn
:param page: 页码
"""
form = SearchForm(request.args)
books = BookCollection()
if form.validate():
q = form.q.data.strip()
isbn_or_key = is_isbn_or_key(q)

yushu_book = YuShuBook()

if isbn_or_key == 'isbn':
yushu_book.search_by_isbn(q)
else:
page = form.page.data
yushu_book.search_by_key(q, page)

books.fill(yushu_book, q)
else:
flash("搜索的关键字不符合要求,请重新输入关键字")

return render_template("search_result.html", books=books)

@web.route("/book/<isbn>/detail")
def book_detail(isbn):
yushu_book = YuShuBook()
yushu_book.search_by_isbn(isbn)
book = BookViewModel(yushu_book.first)
return render_template("book_detail.html", book=book, wishes=[], gifts=[])

app/view_models/book.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 具体的单一数据处理封装在BookViewModel里
class BookViewModel:
def __init__(self, book):
self.title = book['title']
self.author = '、'.join(book['author'])
self.binding = book['binding']
self.publisher = book['publisher']
self.image = book['image']
self.price = '¥' + book['price'] if book['price'] else book['price']
self.isbn = book['isbn']
self.pubdate = book['pubdate']
self.summary = book['summary']
self.pages = book['pages']


# @property装饰器可以让我们把一个方法当做一个属性来使用
@property
def intro(self):
intros = filter(lambda x: True if x else False,
[self.author, self.publisher, self.price])

return ' / '.join(intros)

模型关系

分析业务逻辑,用户赠送书籍,需要将用户赠送书籍的数据保存到数据库中
首先我们需要一个用户User模型,来存储用户信息
其次我们需要一个Book模型,来存储书籍的信息
我们还需要一个Gift模型,来存储哪个用户想要赠送哪本书.从数据库的角度来看,用户和书籍是多对多的关系,多对多的关系需要第三章表.

建立模型代码
app/models/user.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from sqlalchemy import Column, Integer, String, Boolean, Float

from app.models.base import Base



class User(Base):
id = Column(Integer, primary_key=True)
nickname = Column(String(24), nullable=False)
phone_number = Column(String(18), unique=True)
email = Column(String(50), unique=True, nullable=False)
confirmed = Column(Boolean, default=False)
beans = Column(Float, default=0)
send_counter = Column(Integer, default=0)
receive_counter = Column(Integer, default=0)
wx_open_id = Column(String(50))
wx_name = Column(String(32))

app/models/gift.py

1
2
3
4
5
6
7
8
9
10
11
12
13
from sqlalchemy import Column, Integer, ForeignKey, String, Boolean
from sqlalchemy.orm import relationship

from app.models.base import Base


class Gift(Base):
id = Column(Integer, primary_key=True, autoincrement=True)
uid = Column(Integer, ForeignKey('user.id'))
isbn = Column(String(15),nullable=True)
launched = Column(Boolean, default=False)

user = relationship('User')

自定义基类模型
每个表的信息,在删除的时候都不应该物理的从数据库里删除,而应该设置一个标志位,默认为0,如果删除了则置为1,这样可以搜索到历史的用户记录.
像标志位这样的参数,每个表里都有同样的属性,我们应该建立一个基类,来存储这些共有属性
app/models/base.py

1
2
3
4
5
6
7
8
9
10
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, SmallInteger

db = SQLAlchemy()


class Base(db.Model):
__abstract__ = True
create_time = Column(Integer)
status = Column(SmallInteger, default=1)

用户注册

用户注册的界面,和注册POST请求共用同一个视图函数,兼容POST,GET请求.
表单验证的结果数据,赋值到User模型里,可以在Base类里编写一个set_attrs函数,统一将属性拷贝赋值.动态赋值.
验证器中还应该加入业务逻辑的校验,如email不能重复,这需要自己定义验证器,以vaildate_开头
使用filter_by自定义数据库查询
数据库的密码,前端传来的是明文,需要密文加密到数据库,应该给Userpassword提供getter/setter函数.在set值的时候,将password加密在赋值给User_password.
使用db.session采用ORM方式将数据存储到数据库
如果登录成功,则重定向到登录界面

app/web/auth.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from flask import render_template, flash, request
from flask_login import login_user

from app.forms.auth import RegisterForm
from app.models.base import db
from app.models.user import User
from app.web import web


@web.route('/register', methods=['GET', 'POST'])
def register():
form = RegisterForm(request.form)
if request.method == 'POST' and form.validate():
user = User()
user.set_attrs(form.data)

db.session.add(user)
db.session.commit()

return render_template('auth/register.html', form={'data': {}})

app/models/user.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from sqlalchemy import Column, Integer, String, Boolean, Float
from werkzeug.security import generate_password_hash

from app.models.base import Base



class User(Base):
...

@property
def password(self):
return self._password

@password.setter
def password(self, raw):
self._password = generate_password_hash(raw)

app/models/base.py

1
2
3
4
5
6
7
8
9

class Base(db.Model):
__abstract__ = True
status = Column(SmallInteger, default=1)

def set_attrs(self, attrs_dict):
for key, value in attrs_dict.items():
if hasattr(self, key) and key != 'id':
setattr(self, key, value)

app/forms/auth.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from wtforms import Form, StringField, PasswordField
from wtforms.validators import DataRequired, Length, ValidationError

from app.models.user import User


class EmailForm(Form):
email = StringField(validators=[
DataRequired(), Length(8, 64, message='电子邮箱不符合规范')])


class RegisterForm(EmailForm):
nickname = StringField('昵称', validators=[
DataRequired(), Length(2, 10, message='昵称至少需要两个字符,最多10个字符')])

password = PasswordField('密码', validators=[
DataRequired(), Length(6, 20)])

def validate_email(self, field):
if User.query.filter_by(email=field.data).first():
raise ValidationError('电子邮件已被注册')

def validate_nickname(self, field):
if User.query.filter_by(nickname=field.data).first():
raise ValidationError('昵称已存在')

用户登录

接受用户传来的参数并进行参数校验
使用email查询数据库并验证密码是否正确,密码的加密校验应该放在User模型类里,这样可以使得封装性更好,外部调用更加方便
email和密码校验未通过,则通过消息闪现通知客户端消息
email和密码校验通过,则通过flask提供的插件flask_login 将数据写入cookie

app/forms/auth.py

1
2
3
class LoginForm(EmailForm):
password = PasswordField('密码', validators=[
DataRequired(), Length(6, 20)])

app/web/auth.py

1
2
3
4
5
6
7
8
9
10
11
12
@web.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm(request.form)
if request.method == 'POST' and form.validate():
user = User.query.filter_by(email=form.email).first()
if user and user.check_password(user.password):
# 使用flask-login 的 login_user间接写入cookie
# 默认是暂时的cookie,关闭浏览器后cookie消失,如果想改成长期的需要传入关键字参数remember
login_user(user, remember=True)
else:
flash("账号不存在或者密码错误")
return render_template('auth/login.html', form=form)

在models中继承UserMixin

1
2
3
4
5
6
from flask_login import UserMixin

from app.models.base import Base


class User(UserMixin, Base):

将flask-login注册到app中
app/init.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from flask import Flask
from flask_login import LoginManager

from app import config, secure
from app.models.base import db

login_manager = LoginManager()

def create_app():
app = Flask(__name__)

app.config.from_object(config)
app.config.from_object(secure)

register_blueprint(app)

db.init_app(app=app)
db.create_all(app=app)

login_manager.init_app(app=app)

return app

访问权限控制与重定向攻击

在需要登录才能访问的试图函数上,加入@login_required装饰器
app/web/gift.py

1
2
3
4
@web.route('/my/gifts')
@login_required
def my_gifts():
return "my gifts"

User模型里,编写get_user方法用来根据id查询用户,并加入@login_manager.user_loader装饰器(login_managerapp/__init__.py中导入)

1
2
3
4

@login_manager.user_loader
def get_user(self, uid):
return User.query.get(int(uid))

app/__init__.py中,配置未登录时调整到的页面和提示消息

1
2
login_manager.login_view = 'web.login'
login_manager.login_message = '请先登录或注册'

登录成功以后,重定向到next页面;如果没有next页面,则跳转到首页;为了防止重定向攻击,应该判断next是否”/“开头

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@web.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm(request.form)
if request.method == 'POST' and form.validate():
user = User.query.filter_by(email=form.email.data).first()
if user and user.check_password(form.password.data):
login_user(user, remember=True)

next = request.args.get('next')
if not next or not next.startswith('/'):
return redirect(url_for('web.index'))
return redirect(next)
else:
flash("账号不存在或者密码错误")
return render_template('auth/login.html', form=form)

书籍交易模型(数据库事务、重写Flask中的对象)

鱼豆

鱼书的交易货币,在上传一本书的时候,将获取0.5个鱼豆.赠送一个本书的时候,再获取1个鱼豆.索要一本书的时候,消耗一个鱼豆,其中赠送和索要书籍是用户之间鱼豆互相加减,上传的时候是系统赠送.
基于上面的规则,来编写赠送鱼书的视图函数

判断当前书籍是否可以加入赠送清单

  1. 如果isbn编号不符合规则,不允许添加
  2. 如果isbn编号对应的书籍不存在,不允许添加
  3. 同一个用户,不能同时赠送同一本书籍
  4. 一个用户对于一本书不能既是赠书者,又是索要者
  5. 3和4合并成一条,就是一本书必须即不在心愿清单又不在赠书列表里才可以添加

app/models/user.py

1
2
3
4
5
6
7
8
9
10
11
12
def can_save_to_list(self, isbn):
if not is_isbn_or_key(isbn):
return False

yushu_book = YuShuBook()
yushu_book.search_by_isbn(isbn)
if not yushu_book.first:
return False

gifting = Gift.query.filter_by(uid=self.id, isbn=isbn, launched=False).first()
wishing = Wish.query.filter_by(uid=self.id, isbn=isbn, launched=False).first()
return not wishing and not gifting

app/models/wish.py

1
2
3
4
5
6
7
8
9
10
11
12
from sqlalchemy import Column, Integer, ForeignKey, String, Boolean
from sqlalchemy.orm import relationship

from app.models.base import Base


class Wish(Base):
id = Column(Integer, primary_key=True, autoincrement=True)
user = relationship('User')
uid = Column(Integer, ForeignKey('user.id'))
isbn = Column(String(15), nullable=True)
launched = Column(Boolean, default=False)

添加赠送清单,增加鱼豆

添加赠送清单,增加鱼豆对应了两个数据库操作,如果其中一个在执行过程中失败了,那么另一个也不能提交,这用到了数据库的事务.
给用户添加鱼豆需要获取当前用户,我们可以从flask_logincurrent_user获取当前用户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@web.route('/gifts/book/<isbn>')
@login_required
def save_to_gifts(isbn):
if current_user.can_save_to_list(isbn):
try:
gift = Gift()
gift.isbn = isbn
gift.uid = current_user.id

current_user.beans += current_app.config['BEANS_UPLOAD_ONE_BOOK']

db.session.add(gift)
db.session.add(current_user)
db.session.commit()
except Exception as e:
db.session.rollback()
raise e
else:
flash("这本书以添加进您的赠送清单或已经存在于您的心愿清单,请不要重复添加")
return redirect(url_for('web.book_detail', isbn=isbn))

BEANS_UPLOAD_ONE_BOOK = 0.5

添加心愿清单

app/web/wish.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from flask import url_for, flash
from flask_login import current_user, login_required
from werkzeug.utils import redirect

from app.models.base import db
from app.models.wish import Wish
from app.web import web


@web.route('/wish/book/<isbn>')
@login_required
def save_to_wish(isbn):
if current_user.can_save_to_list(isbn):
with db.auto_commit():
wish = Wish()
wish.isbn = isbn
wish.uid = current_user.id

db.session.add(wish)
else:
flash("这本书以添加进您的赠送清单或已经存在于您的心愿清单,请不要重复添加")
return redirect(url_for('web.book_detail', isbn=isbn))

添加赠送清单和心愿清单使用Ajax,提升用户体验度

contextmanager

contextmanager可以简化上下文管理器,不需要我们编写__enter____exit__函数.他给了我们一个机会,让我们把之前一个不是上下文管理器的类变成一个上下文管理器,而不需要我们去修改这个类的源代码

其中yield的作用,是中断当前函数执行流程,先去执行yield出去的部分的代码执行流程
下面的代码的作用,在书籍前后自动加上

1
2
3
4
5
6
7
8
9
10
11
12
from contextlib import contextmanager


@contextmanager
def book_mark():
print('', end='')
yield
print('', end='')


with book_mark():
print('钢铁',end='')

结合继承,contextmanager,yield,rollback来简化try-except的数据库事务代码

通过contextmanager实现一个上下文管理器,将try-except的代码放在contextmanager里,将具体的业务逻辑代码yield出去
SQLAlchemy并没有这个上下文管理器,但是我们可以做一个子类,来扩展他的功能
编写子类的时候,命名是非常不好起的,我们可以改变父类的名字,给子类命名为原父类的名字
app/models/base.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from contextlib import contextmanager

from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy
from sqlalchemy import Column, SmallInteger

class SQLAlchemy(_SQLAlchemy):
@contextmanager
def auto_commit(self):
try:
yield
self.session.commit()
except Exception as e:
self.session.rollback()
raise e


db = SQLAlchemy()

使用auto_commitsave_to_gifts视图函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@web.route('/gifts/book/<isbn>')
@login_required
def save_to_gifts(isbn):
if current_user.can_save_to_list(isbn):
with db.auto_commit():
gift = Gift()
gift.isbn = isbn
gift.uid = current_user.id

current_user.beans += current_app.config['BEANS_UPLOAD_ONE_BOOK']

db.session.add(gift)
else:
flash("这本书以添加进您的赠送清单或已经存在于您的心愿清单,请不要重复添加")
return redirect(url_for('web.book_detail', isbn=isbn))

使用auto_commit的register视图函数

1
2
3
4
5
6
7
8
9
10
11
12
13
@web.route('/register', methods=['GET', 'POST'])
def register():
form = RegisterForm(request.form)
if request.method == 'POST' and form.validate():
with db.auto_commit():
user = User()
user.set_attrs(form.data)

db.session.add(user)

return redirect(url_for('web.login'))

return render_template('auth/register.html', form=form)

1.遇到比较复杂的问题,应该把他单独的分离出来,在一个单独的文件里来编写一些非常简单的源码,因为业务越简单,越能够让我们去关注知识和原理本身的相关问题.
2.高级编程不是在于学习更高级的语法(学会更好),更关键的在于能够用自己所学的知识,写出更好的代码来
3.对知识的综合运用能力很重要,将单个的知识点组合在一起写出一段很好的代码来

书籍交易视图模型

书籍详情页,除了需要显示书籍详情信息外.还应该显示其他信息,这些信息分为三类
1.默认情况下,显示想要赠送这本书的人的列表,包括名字和上传时间.
2.如果当前用户是此书的赠送者,应该显示索要这本书的人的列表.
3.如果当前用户是此书的索要者,应该显示想要赠送这本书的人的列表.

综上所述,我们一共需要两个列表,这本书的索要人列表和这本书的赠书人的列表,根据不同情况进行展示.

1
2
3
# 赠书人列表和索要人列表
trade_gifts = Gift.query.filter_by(isbn=isbn).all()
trade_wishs = Wish.query.filter_by(isbn=isbn).all()

view_model中处理这两个列表的原始数据,加工成需要的数据.由于gifts,wishs两个的加工逻辑一样,只是数据库表不一样,所以可以写一个统一的类trade来处理
app/view_models/trade.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TradeInfo:

def __init__(self, goods):
self.total = 0
self.trades = []
self.__parse(goods)

def __parse(self, goods):
self.total = len(goods)
self.trades = [self.__map_to_trade(single) for single in goods]

def __map_to_trade(self, single):
if single.create_datetime:
time = single.create_datetime.strftime('%Y-%m-%d')
else:
time = '未知'
return dict(
user_name=single.user.nickname,
time=time,
id=single.id
)

create_time 本是int类型,要进行strftime格式化操作需要转化成string类型,这个操作每个模型都要用到,所以编写在base.py

1
2
3
4
5
6
@property
def create_datetime(self):
if self.create_time:
return str(self.create_time)
else:
return None

接下来完善书籍详情视图函数.区分上面说的三种情况.使用current_useris_authenticated函数判断用户是否登录.然后分别以当前用户id为查询条件去wish表和gift表里查询,如果能查询到,则将对应的has_in_gifts/has_in_wishs设置为True

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@web.route("/book/<isbn>/detail")
def book_detail(isbn):
has_in_gifts = False
has_in_wishs = False

# 取出每本书的详情
yushu_book = YuShuBook()
yushu_book.search_by_isbn(isbn)
book = BookViewModel(yushu_book.first)

# 三种情况的判断
if current_user.is_authenticated:
if Gift.query.filter_by(uid=current_user.id).first():
has_in_gifts = True
if Wish.query.filter_by(uid=current_user.id).first():
has_in_wishs = True

# 赠书人列表和索要人列表
trade_gifts = Gift.query.filter_by(isbn=isbn).all()
trade_wishs = Wish.query.filter_by(isbn=isbn).all()

return render_template("book_detail.html", book=book,
wishes=trade_wishs, gifts=trade_gifts,
has_in_wishs=has_in_wishs, has_in_gifts=has_in_gifts)

重写filter_by

由于我们的删除操作都是逻辑删除,所以在查询的时候应该默认查询status=1的记录(即未删除的记录),但是如果在每一个filter_by里都这么写,就太麻烦了,我们的思路是重写默认的filter_by函数,加上status=1的限制条件.

先了解一下原来SQLAlchemy的继承关系
Flask的SQLAlchemy中有一个BaseQuery,BaseQuery继承了orm.Query(原SQLAlchemy的类),这里面有filter_by函数;也就是说BaseQuery通过继承orm.Query拥有了filter_by的能力

1
2
3
4
5
6
7
8
9
10
class SQLAlchemy(object):
Query = None

def __init__(self, app=None, use_native_unicode=True, session_options=None,
metadata=None, query_class=BaseQuery, model_class=Model):
...
...

class BaseQuery(orm.Query):
...

orm.Query

1
2
3
4
5
def filter_by(self, **kwargs):
# for循环拼接关键字参数查询条件
clauses = [_entity_descriptor(self._joinpoint_zero(), key) == value
for key, value in kwargs.items()]
return self.filter(sql.and_(*clauses))

所以如果我们要重写filter_by,就需要自己编写子类,继承BaseQuery,重写filter_by函数,将status=1加入到kwargs

1
2
3
4
5
class Query(BaseQuery):
def filter_by(self, **kwargs):
if 'status' not in kwargs:
kwargs['status'] = 1
return super(Query, self).filter_by(**kwargs)

最后,Flask的SQLAlchemy给了我们一种方法,让我们应用自己的Query类,即在实例化的时候传入关键字参数query_class

1
db = SQLAlchemy(query_class=Query)

鱼书业务处理

最近的礼物

首页会显示最近的赠送书籍列表.这个列表有三个限制条件:
1.数量不超过30
2.按照时间倒序排列,最新的排在最前面
3.去重,同一本书籍的礼物不重复出现

首先编写复杂SQL对应的ORM代码
app/models/gift.py

1
2
3
4
5
6
7
8
9
10
@classmethod
def recent(cls):
# select distinct * from gift group by isbn order by create_time limit 30
recent_gifts = Gift.query \
.filter_by(launched=False) \
.group_by(Gift.isbn) \
.order_by(Gift.create_time) \
.limit(30) \
.distinct().all()
return recent_gifts

为什么要定义成类方法

对象代表一个礼物,是具体的
类代表礼物这个事物,他是抽象的,不是具体的一个

业务的四种编写方案

1.编写在models的对应的gift.py里.
2.编写在视图函数里.
3.在models里建立新的RecentGift模块.
4.建立service层.

编写视图函数
recent函数获取到的gift列表里的每一个gift,都只有isbn编号.但是我们需要把图书的信息都返回回去.这就需要拿isbn编号去YushBook可去查询出书籍的详情信息然后再使用BookViewModel进行封装.

但是上面这段逻辑,不应该写在视图函数的for循环中,他是Gift的行为,应该封装到Gift的模型中去.

1
2
3
4
5
@property
def book(self):
yushu_book = YuShuBook()
yushu_book.search_by_isbn(self.isbn)
return yushu_book.first

不应该在Gift模型中直接返回BookViewModel,因为我们的模型只负责处理原始数据,所有的处理ViewModel都应该放到视图函数里面进行.

app/web/main.py

1
2
3
4
5
@web.route('/')
def index():
recent_gifts = Gift.recent()
books = [BookViewModel(gift.book) for gift in recent_gifts]
return render_template('index.html', recent=books)

之所以能够在调用的地方用一个很简单的列表推导式就完成了这么复杂的逻辑,就是因为我们封装的非常良好.这里面涉及到了几个对象的相互调用bookviewmodel,gift,yushubook;这才是在真正的写面向对象的代码,面向对象就是几个对象在相互调用,各自的逻辑是非常清晰的(单一职责模式)良好的封装是优秀代码的基础

我的礼物(赠送清单)

业务逻辑
赠送清单

复杂点
所有礼物
如果循环次数过多的话,第一种方案是很难接受的

代码编写
app/models/gift.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@classmethod
def get_user_gifts(cls, uid):
gifts = Gift.query \
.filter_by(uid=uid, launched=False) \
.order_by(desc(Gift.create_time)) \
.all()
return gifts

@classmethod
def get_wish_counts(cls, isbn_list):
# 根据传入的一组isbn编号,到Wish表中计算出某个礼物的Wish心愿数量
# select count(id),isbn from wish
# where launched = false and isbn in ('','') and status =1 group by isbn
from app.models.wish import Wish

count_list = db.session.query(func.count(Wish.id), Wish.isbn).filter(
Wish.launched == False,
Wish.isbn.in_(isbn_list),
Wish.status == 1).group_by(
Wish.isbn).all()
# 不要将tuple返回到外部,应该返回有意义的字典或者对象
count_list = [{'count': w[0], 'isbn':w[1]} for w in count_list]
return count_list

view_model/gift.py 对原始数据进行加工

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Gifts:

def __init__(self, gifts_of_mine, wish_count_list):
self.gifts = []
self.__gifts_of_mine = gifts_of_mine
self.__wish_count_list = wish_count_list

self.gifts = self.__parse()

def __parse(self):
temp_gifts = []
for gift in self.__gifts_of_mine:
my_gift = self.__matching(gift)
temp_gifts.append(my_gift)
return temp_gifts

def __matching(self, gift):
count = 0
for wish_count in self.__wish_count_list:
if gift.isbn == wish_count['isbn']:
count = wish_count['count']
r = {
'wishes_count': count,
'book': BookViewModel(gift.book),
'id': gift.id
}
return r

web/gift.py 对视图函数进行组装

1
2
3
4
5
6
7
8
9
@web.route('/my/gifts')
@login_required
def my_gifts():
uid = current_user.id
gifts_of_mine = Gift.get_user_gifts(uid)
isbn_list = [gift.isbn for gift in gifts_of_mine]
wish_count_list = Gift.get_wish_counts(isbn_list)
view_model = Gifts(gifts_of_mine, wish_count_list)
return render_template('my_gifts.html', gifts=view_model.gifts)

上面获取原始数据,是对两张表分别查询,再组装,我们也可以进行连表查询,下面是两种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@classmethod
def get_user_gifts_by_sql(cls, uid):
sql = 'select a.id,a.isbn,count(b.id)' \
'from gift a left join wish b on a.isbn = b.isbn ' \
'where b.uid = %s and a.launched = 0 and b.launched = 0 ' \
'and a.status = 1 and b.status = 1 ' \
'group by a.id,a.isbn order by a.create_time desc'.replace('%s', str(uid))
gifts = db.session.execute(sql)
gifts = [{'id': line[0], 'isbn': line[1], 'count':line[2]} for line in gifts]
return gifts

@classmethod
def get_user_gifts_by_orm(cls, uid):
from app.models.wish import Wish

gifts = db.session\
.query(Gift.id, Gift.isbn, func.count(Wish.id))\
.outerjoin(Wish, Wish.isbn == Gift.isbn)\
.filter(
Gift.launched == False,
Wish.launched == False,
Gift.status == 1,
Wish.status == 1,
Gift.uid == uid)\
.group_by(Gift.id, Wish.isbn)\
.order_by(desc(Gift.create_time))\
.all()
gifts = [{'gift': gift[0], 'count':gift[1]} for gift in gifts]
return gifts

我的心愿(心愿清单)

代码和我的礼物相似
app/models/wish.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@classmethod
def get_user_wishes(cls, uid):
wishes = Wish.query \
.filter_by(uid=uid, launched=False) \
.order_by(desc(Wish.create_time)) \
.all()
return wishes

@classmethod
def get_gifts_counts(cls, isbn_list):
from app.models.gift import Gift
count_list = db.session.query(func.count(Gift.id), Gift.isbn).filter(
Gift.launched == False,
Gift.isbn.in_(isbn_list),
Gift.status == 1).group_by(
Gift.isbn).all()
# 不要将tuple返回到外部,应该返回有意义的字典或者对象
count_list = [{'count': w[0], 'isbn':w[1]} for w in count_list]
return count_list

@property
def book(self):
yushu_book = YuShuBook()
yushu_book.search_by_isbn(self.isbn)
return yushu_book.first

giftwishview_model可以合并成一个MyTrade.实际上Trade应该是giftwish的基类,在我们这里他们之间没有行为差异,之间用一个即可,如果他们有了行为差异,就应该分别继承Trade实现自己的业务逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class MyTrade:

def __init__(self, trades_of_mine, trades_count_list):
self.trades = []
self.__trades_of_mine = trades_of_mine
self.__trades_count_list = trades_count_list

self.trades = self.__parse()

def __parse(self):
temp_trades = []
for trade in self.__trades_of_mine:
my_trade = self.__matching(trade)
temp_trades.append(my_trade)
return temp_trades

def __matching(self, trade):
count = 0
for trade_count in self.__trades_count_list:
if trade.isbn == trade_count['isbn']:
count = trade_count['count']
r = {
'wishes_count': count,
'book': BookViewModel(trade.book),
'id': trade.id
}
return r

Python与Flask的结合应用

密码重置

用户点击忘记密码,跳转到重置密码页面,输入自己的邮箱,系统给这个邮箱发送一封含有个人信息的邮件,用户进入邮箱点击链接进行密码的重置

first_or_404和可调用对象

视图函数接受用户填写的email账号,如果不存在应该跳转到404界面,这个逻辑flask-sqlalchemy为我们提供了良好的封装,不需要手动去处理,只需要调用Queryfirst_or_404()方法即可

1
user = User.query.filter_by(email=account_email).first_or_404()

first_or_404内部是对first函数的封装,他在获取了first的结果以后,会进行空判断,如果结果为空,则调用abort()方法,而abort()方法内部是调用了一个对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def first_or_404(self):
"""Like :meth:`first` but aborts with 404 if not found instead of returning ``None``."""

rv = self.first()
if rv is None:
abort(404)
return rv


def abort(status, *args, **kwargs):
'''
Raises an :py:exc:`HTTPException` for the given status code or WSGI
application::

abort(404) # 404 Not Found
abort(Response('Hello World'))

Can be passed a WSGI application or a status code. If a status code is
given it's looked up in the list of exceptions and will raise that
exception, if passed a WSGI application it will wrap it in a proxy WSGI
exception and raise that::

abort(404)
abort(Response('Hello World'))

'''
return _aborter(status, *args, **kwargs)

_aborter = Aborter()

_aborter是一个实例对象,但是依然可以像调用函数一样调用,是因为其内部实现了__call__方法,如果要将一个对象像函数一样调用,需要在函数内部实现__call__方法,_aborter(status, *args, **kwargs)实际上就是调用了Aborter__call__方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Aborter(object):

"""
When passed a dict of code -> exception items it can be used as
callable that raises exceptions. If the first argument to the
callable is an integer it will be looked up in the mapping, if it's
a WSGI application it will be raised in a proxy exception.

The rest of the arguments are forwarded to the exception constructor.
"""

def __init__(self, mapping=None, extra=None):
if mapping is None:
mapping = default_exceptions
self.mapping = dict(mapping)
if extra is not None:
self.mapping.update(extra)

def __call__(self, code, *args, **kwargs):
if not args and not kwargs and not isinstance(code, integer_types):
raise HTTPException(response=code)
if code not in self.mapping:
raise LookupError('no exception for %r' % code)
raise self.mapping[code](*args, **kwargs)

可调用对象

统一调用接口的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class A:
def go(self):
return object()

class B:
def run(self):
return object()

def func():
return object()

def main(param):
# 我想在main中传入一个参数,得到一个对象object
# b.run()
# a.go()
# func()
pass

main(A())
main(B())
main(func)

如果不适用可调用对象,我们需要在main函数中区分是不同的情况,分别处理,非常的麻烦.
如果main方法传入的参数只有方法那就好说了,我们只需要param()就可以调用,所以python为我们提供了可调用对象,让对象可以像方法一样调用,修改好的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class A:
def __call__(self, *args, **kwargs):
return object()

class B:
def __call__(self, *args, **kwargs):
return object()

def func():
return object()

def main(callback):
callback()

main(A())
main(B())
main(func)

在flask中有很多可调用对象的使用,如globas.py中对current_app,request等对象的封装.其中_find_app,_lookup_req_object都是可调用对象

1
2
3
4
5
6
7
# context locals
_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
current_app = LocalProxy(_find_app)
request = LocalProxy(partial(_lookup_req_object, 'request'))
session = LocalProxy(partial(_lookup_req_object, 'session'))
g = LocalProxy(partial(_lookup_app_object, 'g'))

HttpException

first_ot_404的异常抛出流程
first_ot_404调用abort,abort调用Aborter()__call__方法,由于不满足
__call__的前两个判断,最终会抛出self.mapping[code](*args, **kwargs),在其构造函数中对self.mapping进行了定义,就是一个dict,里面封装了default_exceptions,default_exceptions的数据填充是在_find_exceptions函数中进行的,这个函数的作用是扫描当前模块下HTTPException的子类,并将数据加载到default_exceptions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
default_exceptions = {}
__all__ = ['HTTPException']


def _find_exceptions():
for name, obj in iteritems(globals()):
try:
is_http_exception = issubclass(obj, HTTPException)
except TypeError:
is_http_exception = False
if not is_http_exception or obj.code is None:
continue
__all__.append(obj.__name__)
old_obj = default_exceptions.get(obj.code, None)
if old_obj is not None and issubclass(obj, old_obj):
continue
default_exceptions[obj.code] = obj
_find_exceptions()
del _find_exceptions

最终Aborter函数的__call__方法拿着封装好的self.mapping(实质是default_exceptions)通过参数传来的code去匹配相应的异常,并进行抛出.

404源码

1
2
3
4
5
6
7
8
9
10
11
12
class NotFound(HTTPException):

"""*404* `Not Found`

Raise if a resource does not exist and never existed.
"""
code = 404
description = (
'The requested URL was not found on the server. '
'If you entered the URL manually please check your spelling and '
'try again.'
)

网页上显示的内容就是NotFound中的description

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def get_description(self, environ=None):
"""Get the description."""
return u'<p>%s</p>' % escape(self.description)

def get_body(self, environ=None):
"""Get the HTML body."""
return text_type((
u'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n'
u'<title>%(code)s %(name)s</title>\n'
u'<h1>%(name)s</h1>\n'
u'%(description)s\n'
) % {
'code': self.code,
'name': escape(self.name),
'description': self.get_description(environ)
})

def get_headers(self, environ=None):
"""Get a list of headers."""
return [('Content-Type', 'text/html')]

def get_response(self, environ=None):
"""Get a response object. If one was passed to the exception
it's returned directly.

:param environ: the optional environ for the request. This
can be used to modify the response depending
on how the request looked like.
:return: a :class:`Response` object or a subclass thereof.
"""
if self.response is not None:
return self.response
if environ is not None:
environ = _get_environ(environ)
headers = self.get_headers(environ)
return Response(self.get_body(environ), self.code, headers)

如果不想显示默认的404页面,可以自定义404的视图函数

1
2
3
4
5
6
# web是蓝图对象,当然也可以使用app对象
# app_errorhandler接受一个状态码,代表当前方法处理的异常
# not_found函数不是只能返回视图,他可以做任何你想做的事情
@web.app_errorhandler(404)
def not_found(e):
return render_template('404.html'), 404

发送电子邮件

使用flask-mail插件来完成邮件的发送
pip install flask-mail

注册插件

1
2
mail = Mail()
mail.init_app(app)

EMAIL配置

1
2
3
4
5
6
7
8
# email配置
MAIL_SERVER = 'smtp.qq.com'
MAIL_PORT = 465
MAIL_USE_SSL = True
MAIL_USE_TSL = False
MAIL_USERNAME = '1782509343@qq.com'
# QQ邮箱->设置->账户->[POP3...]->生成授权码->发送短信->获取授权码
MAIL_PASSWORD = 'pstomjiomwyybadh'

邮件发送代码
app/libs/email.py

1
2
3
4
5
6
7
8
9
10
11
12
from flask import current_app, render_template
from flask_mail import Message


def send_email(to, subject, template, **kwargs):
msg = Message(
subject,
sender=current_app.config['MAIL_USERNAME'],
recipients=[to])
# 发送一封HTML邮件
msg.html = render_template(template, kwargs)
msg.send(msg)

测试

1
2
send_email(account_email, "重置你的密码", 'email/reset_password.html',
user=user, token='aaa')

使用itsdangerous生成token
token保存了用户的身份信息,应该有一个过期时间,这里使用itsdangerous插件
app/models/user.py

1
2
3
4
def generate_token(self, expiration=600):
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'id': self.id}).decode('utf-8')

app/models/user.py

1
2
3
4
5
6
7
8
9
10
11
12
13
@staticmethod
def reset_password(token, new_password):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token.encode('utf-8'))
except:
return False
user = User.query.get(data.get('id'))
if user is None:
return False
user.password = new_password
db.session.commit()
return True

视图函数编写

1
2
3
4
5
6
7
8
9
10
11
@web.route('/reset/password/<token>', methods=['GET', 'POST'])
def forget_password(token):
form = ResetPasswordForm(request.form)
if request.method == 'POST' and form.validate():
success = User.reset_password(token, form.password1.data)
if success:
flash('您的密码已重置,请使用新密码登录')
return redirect(url_for('web.login'))

flash('密码重置失败')
return render_template('auth/forget_password.html')

异步发送邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from threading import Thread

from flask import current_app, render_template
from flask_mail import Message, Mail

mail = Mail()

def send_async_email(app, msg):
with app.app_context():
try:
mail.send(msg)
except Exception as e:
pass


def send_email(to, subject, template, **kwargs):
app = current_app._get_current_object()
msg = Message('[鱼书]' + ' ' + subject,
sender=app.config['MAIL_SENDER'], recipients=[to])
# msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs)
thr = Thread(target=send_async_email, args=[app, msg])
thr.start()
return thr

鱼漂(联系方式)

业务逻辑
鱼漂

模型设计

app/models/drift.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from sqlalchemy import Column, Integer, String, SmallInteger

from app.models.base import Base


class Drift(Base):
"""
一次具体的交易信息
"""
__tablename__ = 'drift'


id = Column(Integer, primary_key=True)

# 邮寄信息
recipient_name = Column(String(20), nullable=False)
address = Column(String(100), nullable=False)
message = Column(String(200))
mobile = Column(String(20), nullable=False)

# 书籍信息
isbn = Column(String(13))
book_title = Column(String(50))
book_author = Column(String(30))
book_img = Column(String(50))

# requester_id = Column(Integer, ForeignKey('user.id'))
# requester = relationship('User')

# 请求者信息
requester_id = Column(Integer)
requester_nickname = Column(String(20))

# 赠送这信息
gifter_id = Column(Integer)
gift_id = Column(Integer)
gifter_nickname = Column(String(20))

# 状态
_pending = Column('pending', SmallInteger, default=1)
# gift_id = Column(Integer, ForeignKey('gift.id'))
# gift = relationship('Gift')

状态信息
app/libs/enums.py

1
2
3
4
5
6
7
8
9
from enum import Enum


class PendingStatus(Enum):
"""交易状态"""
waiting = 1
success = 2
reject = 3
redraw = 4

条件检测

自己不能够向自己索要数据
app/models/gift.py

1
2
3
def is_yourself_gift(self, uid):
if self.uid == uid:
return True

鱼豆数量必须大于等于1
每索取两本书,必须赠送一本书
app/models/user.py

1
2
3
4
5
6
7
8
9
10
def can_send_drifts(self):
if self.beans < 1:
return False
success_gift_count = Gift.query.filter_by(
uid=self.id, launched=True).count()

success_receive_count = Drift.query.filter_by(
uid=self.id, pending=PendingStatus.Success).count()

return floor(success_receive_count / 2) <= success_gift_count

业务逻辑

视图函数
app/web/drift.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@web.route('/drift/<int:gid>', methods=['GET', 'POST'])
@login_required
def send_drift(gid):
current_gift = Gift.query.get_or_404(gid)
if current_gift.is_yourself_gift(current_user.id):
flash('这本书是自己的(*^▽^*),不能向自己索要哦')
return redirect(url_for('web.book_detail', isbn=current_gift.isbn))

can = current_user.can_send_drifts()
if not can:
return render_template('not_enough_beans.html', beans=current_user.beans)

form = DriftForm(request.form)
if request.method == 'POST' and form.validate():
drift = Drift()
drift.save_drift(form, current_gift)
send_email(current_gift.user.email, '有人想要一本书', 'email/get_gift.html',
wisher=current_user,
gift=current_gift)
# 成功后跳转到鱼漂历史记录界面
return redirect(url_for('web.pending'))
# summary用户的简介频繁使用,且更像是一种用户的属性,所以作为用户的一个属性
gifter = current_gift.user.summary
return render_template('drift.html', gifter=gifter, user_beans=current_user.beans, form=form)

app/forms/book.py

1
2
3
4
5
6
7
8
9
10
class DriftForm(Form):
recipient_name = StringField(
'收件人姓名', validators=[DataRequired(), Length(min=2, max=20,
message='收件人姓名长度必须在2到20个字符之间')])
mobile = StringField('手机号', validators=[DataRequired(),
Regexp('^1[0-9]{10}$', 0, '请输入正确的手机号')])
message = StringField('留言')
address = StringField(
'邮寄地址', validators=[DataRequired(),
Length(min=10, max=70, message='地址还不到10个字吗?尽量写详细一些吧')])

app/models/user.py

1
2
3
4
5
6
7
8
@property
def summary(self):
return dict(
nickname=self.nickname,
beans=self.beans,
email=self.email,
send_receive=str(self.send_counter) + '/' + str(self.receive_counter)
)

app/models/drift.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def save_drift(self, drift_form, current_gift):
with db.auto_commit():
drift = Drift()
# form中的信息填充到drift中
drift_form.populate_obj(drift)

drift.gift_id = current_gift.id
drift.requester_id = current_user.id
drift.requester_nickname = current_user.nickname
drift.gifter_nickname = current_gift.user.nickname
drift.gifter_id = current_gift.user.id

book = BookViewModel(current_gift.book)
drift.book_title = book.title
drift.book_author = book.author
drift.book_img = book.image
drift.isbn = book.isbn

db.session.add(drift)

current_user.beans -= 1

鱼漂(交易记录)

获取记录

1
2
3
drifts=Drift.query.filter(or_(Drift.requester_id==current_user.id,
Drift.gifter_id==current_user.id),
Drift.status==1).order_by(desc(Drift.create_time)).all()

Drift ViewModel 编写
Drift ViewModel需要适应当前用户是赠送者和当前用户是索要者两个情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from app.libs.enums import PendingStatus


class DriftViewModel():
def __init__(self,drift,current_user_id):
self.data={}
self.data=self.__parse(drift,current_user_id)
def __parse(self,drift,current_user_id):
you_are=DriftViewModel.is_requester_or_gift(drift,current_user_id)
status_str=PendingStatus.pending(drift.pending,you_are)
r={
'you_are':you_are,
'drift_id':drift.id,
'book_title':drift.book_title,
'book_author':drift.book_author,
'book_img':drift.book_img,
'date':drift.create_datetime.strftime('%Y-%m-%d'),
'message':drift.message,
'address':drift.address,
'operator':drift.gifter_name if you_are=='requester' else drift.requester_name,
'recipient_name':drift.recipient_name,
'mobile':drift.mobile,
'status':drift.pending,
'status_str':status_str
}
return r
@staticmethod
def is_requester_or_gift(drift,uid):
return 'requester' if drift.requester_id==uid else 'gifter'
class DriftCollection():
def __init__(self,drifts,current_user_id):
self.data=[]
self.__parse(drifts,current_user_id)
def __parse(self,drifts,current_user_id):
for drift in drifts:
temp=DriftViewModel(drift,current_user_id)
self.data.append(temp.data)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from enum import Enum


class PendingStatus(Enum):
Waiting = 1
Success = 2
Reject = 3
Redraw = 4

@classmethod
def pending(cls, status, key):
key_map = {
cls.Waiting: {
'requester': '等待对方邮寄',
'gifter': '等待你邮寄'
},
cls.Success: {
'requester': '对方已邮寄',
'gifter': '你已邮寄'
},
cls.Reject: {
'requester': '对方已拒绝',
'gifter': '你已拒绝'
},
cls.Redraw: {
'requester': '你已撤销',
'gifter': '对方已撤销'
}
}
return key_map[status][key]

其它操作

撤销操作业务逻辑

1
2
3
4
5
6
7
8
9
10
@web.route('/drift/<int:did>/redraw')
@login_required
def redraw_drift(did):
with db.auto_commit():
drift = Drift.query.filter(Drift.id == did, Drift.requester_id == current_user.id,Drift._pending==PendingStatus.Waiting.value,Drift.status==1).first_or_404()
drift.pending=PendingStatus.Redraw
current_user.beans+=1
db.session.add(drift)
flash('已经成功撤销一条鱼漂请求')
return redirect(url_for('web.pending'))

拒绝操作业务逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
@web.route('/drift/<int:did>/reject')
@login_required
def reject_drift(did):
drift = Drift.query.filter(
Drift.id == did, Drift.gifter_id == current_user.id,Drift._pending==PendingStatus.Waiting.value,Drift.status==1).first_or_404()
with db.auto_commit():
drift.pending=PendingStatus.Reject
requester=User.query.filter(User.id==drift.requester_id).first_or_404()
requester.beans+=1
db.session.add(requester)
db.session.add(drift)
flash('已经成功拒绝一条鱼漂请求')
return redirect(url_for('web.pending'))

完成邮寄

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@web.route('/drift/<int:did>/mailed')
@login_required
def mailed_drift(did):
with db.auto_commit():
drift=Drift.query.filter(Drift.id==did,Drift.gifter_id==current_user.id,Drift._pending==PendingStatus.Waiting.value,Drift.status==1).first_or_404()
# 查找索要者
requester=User.query.filter_by(id=drift.requester_id).first_or_404()
drift.pending=PendingStatus.Success
current_user.beans+=1
requester.receive_counter+=1
current_user.send_counter+=1
gift=Gift.query.get_or_404(drift.gift_id)
gift.launched=True
# 这个人可能心愿里没有添加这本书,但是直接向别人索要
wish=Wish.query.filter(Wish.isbn==drift.isbn,Wish.uid==drift.requester_id,Wish.launched==False,Drift.status==1).first()
if wish:
wish.launched=True
db.session.add(wish)
db.session.add(requester)
db.session.add(drift)
db.session.add(gift)
flash('已经成功邮寄一条鱼漂了~感谢您的公益风险~')
return redirect(url_for('web.pending'))

撤销赠送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@web.route('/gifts/<gid>/redraw')
@login_required
def redraw_from_gifts(gid):
gift=Gift.query.filter_by(id=gid,uid=current_user.id,launched=False).first_or_404()
drift=Drift.query.filter_by(isbn=gift.isbn,gift_id=gid,gifter_id=current_user.id,_pending=PendingStatus.Waiting.value).first()

if drift:
flash('这个礼物正处于交易状态,请先前往鱼漂完成该交易')
else :
with db.auto_commit():
current_user.beans-=current_app.config['BEANS_UPLOAD_ONE_BOOK']
gift.delete()
flash('您已经成功撤销书籍: 《'+gift.book['title']+'》的赠送')
return redirect(url_for('web.my_gifts'))

撤销心愿

1
2
3
4
5
6
7
@web.route('/wish/book/<isbn>/redraw')
def redraw_from_wish(isbn):
wish=Wish.query.filter_by(uid=current_user.id,isbn=isbn,launched=False).first_or_404()
with db.auto_commit():
wish.delete()
flash('您已经成功撤销书: 《'+wish.book['title']+'》的心愿')
return redirect(url_for('web.my_wish'))
-------------本文结束感谢您的阅读-------------
坚持原创技术分享,您的支持将鼓励我继续创作!
0%