《Mastering Python Design patterns》读书笔记¶
作者:Sakis Kasampalis
2023.02.04
【创建型】¶
1. 工厂模式¶
工厂方法
# @Author : baili
# @File : factory_method
# @Time : 2023/2/5 16:27
import xml.etree.ElementTree as etree
import json
class JSONConnector:
def __init__(self, filepath):
self.data = dict()
with open(filepath, mode='r', encoding='utf-8') as f:
self.data = json.load(f)
@property
def parsed_data(self):
return self.data
class XMLConnector:
def __init__(self, filepath):
self.tree = etree.parse(filepath)
@property
def parsed_data(self):
return self.tree
def connection_factory(filepath):
if filepath.endswith('json'):
connector = JSONConnector
elif filepath.endswith('xml'):
connector = XMLConnector
else:
raise ValueError(f'cannot connect to {filepath}')
return connector(filepath)
def connect_to(filepath):
factory = None
try:
factory = connection_factory(filepath)
except ValueError as ve:
print(ve)
finally:
return factory
def main():
sqlite_factory = connect_to('scripts/data/person.sq3')
xml_factory = connect_to('scripts/data/person.xml')
xml_data = xml_factory.parsed_data
liars = xml_data.findall(".//{}[{}='{}']".format('person', 'lastName', 'Liar'))
print(f'found: {len(liars)} persons')
for liar in liars:
print(f'first name: {liar.find("firstName").text}')
print(f'last name: {liar.find("lastName").text}')
json_factory = connect_to('scripts/data/donut.json')
json_data = json_factory.parsed_data
print(f'found: {len(json_data)} donuts')
for donut in json_data:
print(f'name: {donut["name"]}')
print(f'price: {donut["ppu"]}')
if __name__ == '__main__':
main()
peson.xml:
<persons>
<person>
<firstName>John</firstName>
<lastName>Smith</lastName>
<age>25</age>
<address>
<streetAddress>21 2nd Street</streetAddress>
<city>New York</city>
<state>NY</state>
<postalCode>10021</postalCode>
</address>
<phoneNumbers>
<phoneNumber type="home">212 555-1234</phoneNumber>
<phoneNumber type="fax">646 555-4567</phoneNumber>
</phoneNumbers>
<gender>
<type>male</type>
</gender>
</person>
<person>
<firstName>Jimmy</firstName>
<lastName>Liar</lastName>
<age>19</age>
<address>
<streetAddress>18 2nd Street</streetAddress>
<city>New York</city>
<state>NY</state>
<postalCode>10021</postalCode>
</address>
<phoneNumbers>
<phoneNumber type="home">212 555-1234</phoneNumber>
</phoneNumbers>
<gender>
<type>male</type>
</gender>
</person>
<person>
<firstName>Patty</firstName>
<lastName>Liar</lastName>
<age>20</age>
<address>
<streetAddress>18 2nd Street</streetAddress>
<city>New York</city>
<state>NY</state>
<postalCode>10021</postalCode>
</address>
<phoneNumbers>
<phoneNumber type="home">212 555-1234</phoneNumber>
<phoneNumber type="mobile">001 452-8819</phoneNumber>
</phoneNumbers>
<gender>
<type>female</type>
</gender>
</person>
</persons>
donut.json文件内容:
[
{
"id": "0001",
"type": "donut",
"name": "Cake",
"ppu": 0.55,
"batter": {
"batter": [
{"id": "1001", "type": "Regular"},
{"id": "1002", "type": "Chocolate"},
{"id": "1003", "type": "Blueberry"},
{"id": "1004", "type": "Devil's Food"}
]
},
"topping": [
{"id": "5001", "type": "None"},
{"id": "5002", "type": "Glazed"},
{"id": "5005", "type": "Sugar"},
{"id": "5007", "type": "Powdered Sugar"},
{"id": "5006", "type": "Chocolate with Sprinkles"},
{"id": "5003", "type": "Chocolate"},
{"id": "5004", "type": "Maple"}
]
},
{
"id": "0002",
"type": "donut",
"name": "Raised",
"ppu": 0.55,
"batters": {
"batter": [
{"id": "1001", "type": "Regular"}
]
},
"topping": [
{"id": "5001", "type": "None"},
{"id": "5002", "type": "Glazed"},
{"id": "5005", "type": "Sugar"},
{"id": "5003", "type": "Chocolate"},
{"id": "5004", "type": "Maple"}
]
},
{
"id": "0003",
"type": "donut",
"name": "Old Fashioned",
"ppu": 0.55,
"batters": {
"batter": [
{"id": "1001", "type": "Regular"},
{"id": "1002", "type": "Chocolate"}
]
},
"topping": [
{"id": "5001", "type": "None"},
{"id": "5002", "type": "Glazed"},
{"id": "5003", "type": "Chocolate"},
{"id": "5004", "type": "Maple"}
]
}
]
抽象工厂
# @Author : baili
# @File : abstact_factory
# @Time : 2023/2/5 17:12
class Frog:
def __init__(self, name):
self.name = name
def __str__(self):
return self.name
def interact_with(self, obstacle):
print(f'{self} the frog encounters {obstacle} and {obstacle.action()}!')
class Bug:
def __str__(self):
return 'a bug'
def action(self):
return 'eats it'
class FrogWorld:
def __init__(self, name):
print(self)
self.player_name = name
def __str__(self):
return '\n\n\t ------ Frog World ------'
def make_character(self):
return Frog(self.player_name)
def make_obstacle(self):
return Bug()
class Wizard:
def __init__(self, name):
self.name = name
def __str__(self):
return self.name
def interact_with(self, obstacle):
print(f'{self} the Wizard battles against {obstacle} and {obstacle.action()}!')
class Ork:
def __str__(self):
return 'an evil ork'
def action(self):
return 'kills it'
class WizardWorld:
def __init__(self, name):
print(self)
self.player_name = name
def __str__(self):
return '\n\n\t------ Wizard World ------'
def make_character(self):
return Wizard(self.player_name)
def make_obstacle(self):
return Ork()
class GameEnvironment:
def __init__(self, factory):
self.hero = factory.make_character()
self.obstacle = factory.make_obstacle()
def play(self):
self.hero.interact_with(self.obstacle)
def validate_age(name):
try:
age = input(f'Welcome {name}. How old are you?')
age = int(age)
except ValueError as e:
print(f'Age {age} is invalid, please try again...')
return False, age
return True, age
def main():
name = input("Hello. What's your name?")
valid_input = False
while not valid_input:
valid_input, age = validate_age(name)
game = FrogWorld if age < 18 else WizardWorld
environment = GameEnvironment(game(name))
environment.play()
if __name__ == '__main__':
main()
2. 建造者模式¶
# @Author : baili
# @File : builder
# @Time : 2023/2/5 17:33
from enum import Enum
import time
PizzaProgress = Enum('PizzaProgress', 'queued preparation baking ready')
PizzaDough = Enum('PizzaDough', 'thin thick')
PizzaSauce = Enum('PizzaSouce', 'tomato creme_fraiche')
PizzaTopping = Enum('PizzaTopping', 'mozzarella double_mozzarella bacon ham mushrooms red_onion oregano')
STEP_DELAY = 3
class Pizza:
def __init__(self, name):
self.name = name
self.dough = None
self.sauce = None
self.topping = []
def __str__(self):
return self.name
def prepare_dough(self, dough):
self.dough = dough
print(f'preparing the {self.dough.name} dough of your {self}...')
time.sleep(STEP_DELAY)
print(f'done with the {self.dough.name} dough')
class MargaritaBuilder:
def __init__(self):
self.pizza = Pizza('margarita')
self.progress = PizzaProgress.queued
self.baking_time = 5
def prepare_dough(self):
self.progress = PizzaProgress.preparation
self.pizza.prepare_dough(PizzaDough.thin)
def add_sauce(self):
print('adding the tomato sauce to you margarita...')
def add_topping(self):
print(f'adding the topping (double mozzarella, oregano) to your margarita')
self.pizza.topping.append([i for i in (PizzaTopping.double_mozzarella, PizzaTopping.oregano)])
time.sleep(STEP_DELAY)
print('done with the topping (double mozzarella, oregano)')
def bake(self):
self.progress = PizzaProgress.baking
print(f'baking your margarita for {self.baking_time} seconds')
time.sleep(STEP_DELAY)
self.progress = PizzaProgress.ready
print(f'your margarita is ready')
class CreamyBaconBuilder:
def __init__(self):
self.pizza = Pizza('creamy bacon')
self.progress = PizzaProgress.queued
self.baking_time = 7
def prepare_dough(self):
self.progress = PizzaProgress.preparation
self.pizza.prepare_dough(PizzaDough.thick)
def add_sauce(self):
print('adding the creme sauce to your creamy bacon')
self.pizza.sauce = PizzaSauce.creme_fraiche
time.sleep(STEP_DELAY)
print('done with the creme fraiche sauce')
def add_topping(self):
print('adding the topping (mozzarella, bacon, ham, mushrooms, red onion, oregano) to your creamy bacon')
self.pizza.topping.append([t for t in (PizzaTopping.mozzarella,
PizzaTopping.bacon,
PizzaTopping.ham,
PizzaTopping.mushrooms,
PizzaTopping.red_onion,
PizzaTopping.oregano)])
time.sleep(STEP_DELAY)
print('done with the topping (mozzarella, bacon, ham, mushrooms, red onion, oregano)')
def bake(self):
self.progress = PizzaProgress.baking
print(f'baking your creamy bacon for {self.baking_time} seconds')
time.sleep(self.baking_time)
self.progress = PizzaProgress.ready
print('your creamy bacon is ready')
class Waiter:
def __init__(self):
self.builder = None
def construct_pizza(self, builder):
self.builder = builder
for step in (builder.prepare_dough, builder.add_sauce, builder.add_topping, builder.bake):
step()
@property
def pizza(self):
return self.builder.pizza
def validate_style(builders):
try:
pizza_style = input('What pizza would you like, [m]argarita or [c]reamy bacon?')
builder = builders[pizza_style]()
valid_input = True
except KeyError as e:
print('Sorry, only margarita (key m) and creamy bacon (key c) are available')
return False, None
return True, builder
def main():
builders = dict(m=MargaritaBuilder, c=CreamyBaconBuilder)
valid_input = False
while not valid_input:
valid_input, builder = validate_style(builders)
print()
waiter = Waiter()
waiter.construct_pizza(builder)
pizza = waiter.pizza
print()
print(f'Enjoy your {pizza}!')
if __name__ == '__main__':
main()
3. 原型模式¶
# @Author : baili
# @File : prototype
# @Time : 2023/2/5 20:11
import copy
from collections import OrderedDict
class Book:
def __init__(self, name, authors, price, **rest):
'''rest的例子有出版商,出版日期,标签等等'''
self.name = name
self.authors = authors
self.price = price
self.__dict__.update(rest)
def __str__(self):
my_list = []
ordered = OrderedDict(sorted(self.__dict__.items()))
for i in ordered.keys():
my_list.append(f'{i}: {ordered[i]}')
if i == 'price':
my_list.append('$')
my_list.append('\n')
return ''.join(my_list)
class Prototype:
def __init__(self):
self.objects = dict()
def register(self, identifier, obj):
self.objects[identifier] = obj
def unregister(self, identifier):
del self.objects[identifier]
def clone(self, identifier, **attr):
found = self.objects.get(identifier)
if not found:
raise ValueError(f'Incorrect object identifier: {identifier}')
obj = copy.deepcopy(found)
obj.__dict__.update(attr)
return obj
def main():
b1 = Book('The C Programming Language', ('Brain W. Kernighan', 'Dennis M.Ritchie'),
price=118,
publisher='Prentice Hall',
publication_date='1978-02-22',
tags=('C', 'programming', 'algorithms', 'data structures'))
prototype = Prototype()
cid = 'k&r-first'
prototype.register(cid, b1)
b2 = prototype.clone(cid, name='The C Programming Language(ANSI)',
price=48.99,
publication_date='1988-04-01',
edition=2)
for b in (b1, b2):
print(b)
print(f'ID b1:{id(b1)} != ID b2 :{id(b2)}')
if __name__ == '__main__':
main()
【结构型】¶
4. 适配器模式¶
Adapter pattern,帮助我们实现两个不兼容接口之间的适配。
external.py
# @Author : baili
# @File : external
# @Time : 2023/2/5 20:30
class Synthesizer:
def __init__(self, name):
self.name = name
def __str__(self):
return f'the {self.name} synthesizer'
def play(self):
return 'is playing an electronic song'
class Human:
def __init__(self, name):
self.name = name
def __str__(self):
return f'{self.name} the human'
def speak(self):
return 'says hello'
adapter.py
from external import Synthesizer, Human
class Computer:
def __init__(self, name):
self.name = name
def __str__(self):
return f'{self.name} the computer'
def execute(self):
return 'execute a program'
class Adapter:
def __init__(self, obj, adapted_methods):
self.obj = obj
self.__dict__.update(adapted_methods)
def __str__(self):
return str(self.obj)
def main():
objects = [Computer('Apple')]
synth = Synthesizer('moog')
objects.append(Adapter(synth, dict(execute=synth.play)))
human = Human('Bob')
objects.append(Adapter(human, dict(execute=human.speak)))
for obj in objects:
print(f'{obj} {obj.execute()}')
if __name__ == '__main__':
main()
5. 装饰器模式¶
考虑对一个对象添加新的功能时,可选方案: 1. 修改类代码(bad) 2. 使用继承(bad) 3. 使用组合(better) 4. 使用装饰器(better)
import functools
def memoize(fn):
known = dict()
@functools.wraps(fn)
def memoizer(*args):
if args not in known:
known[args] = fn(*args)
return known[args]
return memoizer
@memoize
def nsum(n):
"""返回前n个数字的和"""
assert (n >= 0), 'n must be >= 0'
return 0 if n == 0 else n + nsum(n-1)
@memoize
def fibonacci(n):
"""返回斐波那契数列的第n个数"""
assert(n >= 0), 'n must be >= 0'
return n if n in (0, 1) else fibonacci(n-1) + fibonacci(n-2)
if __name__ == '__main__':
from timeit import Timer
messure = [
{
'exec': 'fibonacci(100)',
'import': 'fibonacci',
'func': fibonacci
},
{
'exec': 'nsum(200)',
'import': 'nsum',
'func': nsum
}
]
for m in messure:
t = Timer(f'{m["exec"]}', f'from __main__ import {m["import"]}')
print(f'name: {m["func"].__name__}, executing: {m["exec"]}, time: {t.timeit()}')
6. 外观模式¶
# @Author : baili
# @File : facade
# @Time : 2023/2/25 10:56
from enum import Enum
from abc import ABCMeta, abstractmethod
State = Enum('State', 'new running sleeping restart zombie')
class User:
pass
class Process:
pass
class File:
pass
class Server(metaclass=ABCMeta):
@abstractmethod
def __init__(self):
pass
def __str__(self):
return self.name
@abstractmethod
def boot(self):
pass
@abstractmethod
def kill(self, restart=True):
pass
class FileServer(Server):
def __init__(self):
"""初始化文件服务进程要求的操作"""
self.name = 'FileServer'
self.state = State.new
def boot(self):
"""启动文件服务进程要求的操作"""
print(f'booting the {self}')
def kill(self, restart=True):
"""终止文件服务进程要求的操作"""
print(f'Killing {self}')
def create_file(self, user, name, permissions):
"""检查访问权限的有效性、用户权限等"""
print(f'trying to create the file "{name}" for user "{user}" with permissions {permissions}')
class ProcessServer(Server):
def __init__(self):
"""初始化进程服务进程要求的操作"""
self.name = 'ProcessServer'
self.state = State.new
def boot(self):
"""启动进程服务要求的操作"""
print(f'booting the {self}')
self.state = State.running
def kill(self, restart=True):
print(f'Killing {self}')
self.state = State.restart if restart else State.zombie
def create_process(self, user, name):
print(f'trying to create the process "{name}" for user "{user}"')
class WindowServer:
pass
class NetworkServer:
pass
class OperatingSystem:
"""外观"""
def __init__(self):
self.fs = FileServer()
self.ps = ProcessServer()
def start(self):
for i in (self.fs, self.ps):
i.boot()
def create_file(self, user, name ,permissions):
return self.fs.create_file(user, name ,permissions)
def create_process(self, user, name):
return self.ps.create_process(user, name)
def main():
os = OperatingSystem()
os.start()
os.create_file('foo', 'hello', '-rw-r-r')
os.create_process('bar', 'ls /tmp')
if __name__ == '__main__':
main()
7. 享元模式¶
# @Author : baili
# @File : flyweight
# @Time : 2023/2/25 11:15
import random
from enum import Enum
TreeType = Enum('TreeType', 'apple_tree cherry_tree peach_tree')
class Tree:
pool = dict()
def __new__(cls, tree_type):
obj = cls.pool.get(tree_type, None)
if not obj:
obj = object.__new__(cls)
cls.pool[tree_type] = obj
obj.tree_type = tree_type
return obj
def render(self, age, x, y):
print(f'render a {self.tree_type} and age {age} at ({x}, {y})')
def main():
rand = random.Random()
age_min, age_max = 1, 30
min_point, max_point = 0, 100
tree_counter = 0
for _ in range(10):
t1 = Tree(TreeType.apple_tree)
t1.render(rand.randint(age_min, age_max),
rand.randint(min_point, max_point),
rand.randint(min_point, max_point))
tree_counter += 1
for _ in range(3):
t2 = Tree(TreeType.cherry_tree)
t2.render(rand.randint(age_min, age_max),
rand.randint(min_point, max_point),
rand.randint(min_point, max_point))
tree_counter += 1
for _ in range(5):
t3 = Tree(TreeType.peach_tree)
t3.render(rand.randint(age_min, age_max),
rand.randint(min_point, max_point),
rand.randint(min_point, max_point))
tree_counter += 1
print(f'trees rendered: {tree_counter}')
print(f'trees actually created: {len(Tree.pool)}')
t4 = Tree(TreeType.cherry_tree)
t5 = Tree(TreeType.cherry_tree)
t6 = Tree(TreeType.apple_tree)
print(f'{id(t4)} == {id(t5)}? {id(t4)==id(t5)}')
print(f'{id(t6)} == {id(t5)}? {id(t6)==id(t5)}')
if __name__ == '__main__':
main()
8. MVC模式¶
Model-View-Controller
模型:代表应用的信息本源,包含业务逻辑,数据,状态以及应用的规则。 视图:模型的可视化表现,只是展示数据,不处理数据。 控制器:负责衔接模型和视图之间的通信。
实现MVC时,请确保模型很智能,控制器很瘦,视图很傻瓜。
智能模型 - 包含所有的校验/业务规则/逻辑 - 处理应用的状态 - 访问应用数据(数据库) - 不依赖UI
瘦控制器 - 在用户与视图交互时,更新模型 - 在模型改变时,更新视图 - 如果需要,在数据传递给模型/视图之前进行处理 - 不展示数据 - 不直接访问应用数据 - 不包含校验/业务规则/逻辑
傻瓜视图 - 展示数据 - 允许用户与其交互 - 仅做最小的数据处理(简单变量和循环) - 不存储任何数据 - 不直接访问应用数据 - 不包含校验/业务规则/逻辑
# @Author : baili
# @File : mvc
# @Time : 2023/2/25 11:49
quotes = (
'A man is not complete until he is married. Then he is finished.',
'As I said before, I never repeat myself.',
'Behind a successful man is an exhausted woman.',
'Black holes really suck...',
'Facts are stubborn things.'
)
class QuoteModel:
def get_quote(self, n):
try:
value = quotes[n]
except IndexError as e:
value = 'Not found!'
return value
class QuoteTerminalView:
def show(self, quote):
print(f'And the quote is: "{quote}"')
def error(self, msg):
print(f'Error: {msg}')
def select_quote(self):
return input('Which quote number would you like to see?')
class QuoteTerminalController:
def __init__(self):
self.model = QuoteModel()
self.view = QuoteTerminalView()
def run(self):
valid_input = False
while not valid_input:
try:
n = self.view.select_quote()
n = int(n)
valid_input = True
except ValueError as e:
self.view.error(f'Incorrect index "{n}"')
quote = self.model.get_quote(n)
self.view.show(quote)
def main():
controller = QuoteTerminalController()
while True:
controller.run()
if __name__ == '__main__':
main()
9. 代理模式¶
# @Author : baili
# @File : proxy
# @Time : 2023/2/25 11:59
class SensitiveInfo:
def __init__(self):
self.users = ['nick', 'tom', 'ben', 'mike']
def read(self):
print(f'There are {len(self.users)} users: {" ".join(self.users)}')
def add(self, user):
self.users.append(user)
print(f'Added user {user}')
class Info:
"""SensitiveInfo的保护代理"""
def __init__(self):
self.protected = SensitiveInfo()
self.secret = '123456'
def read(self):
self.protected.read()
def add(self, user):
sec = input('what is the secret ? ')
self.protected.add(user) if sec == self.secret else print("That's wrong!")
def main():
info = Info()
while True:
print('1. read list |==| 2. add user |==| 3. quit')
key = input('choose option: ')
if key == '1':
info.read()
elif key == '2':
name = input('choose usernames: ')
info.add(name)
elif key == '3':
exit()
else:
print(f'unknown option: {key}')
if __name__ == '__main__':
main()
【行为型】¶
- 责任链模式
# @Author : baili
# @File : chain
# @Time : 2023/2/25 14:40
class Event:
def __init__(self, name):
self.name = name
def __str__(self):
return self.name
class Widget:
def __init__(self, parent=None):
self.parent = parent
def handle(self, event):
handler = f'handle_{event}'
if hasattr(self, handler):
method = getattr(self, handler)
method(event)
elif self.parent:
self.parent.handle(event)
elif hasattr(self, 'handle_default'):
self.handle_default(event)
class MainWindow(Widget):
def handle_close(self, event):
print(f'MainWindow: {event}')
def handle_default(self, event):
print(f'SendDialog: {event}')
class SendDialog(Widget):
def handle_paint(self, event):
print(f'SendDialog: {event}')
class MsgText(Widget):
def handle_down(self, event):
print(f'MsgText: {event}')
def main():
window = MainWindow()
dialog = SendDialog(window)
msg = MsgText(dialog)
for e in ('down', 'paint', 'unhandled', 'close'):
evt = Event(e)
print(f'\nSending event -{evt}- to MainWindow')
window.handle(evt)
print(f'Sending event -{evt}- to SendDialog')
dialog.handle(evt)
print(f'Sending event -{evt}- to MsgText')
msg.handle(evt)
if __name__ == '__main__':
main()
- 命令模式
# @Author : baili
# @File : command
# @Time : 2023/2/25 14:49
import os
verbose = True
class RenameFile:
def __init__(self, path_src, path_dest):
self.src, self.dest = path_src, path_dest
def execute(self):
if verbose:
print(f'[renaming "{self.src}" to "{self.dest}"]')
os.rename(self.src, self.dest)
def undo(self):
if verbose:
print(f'[renaming "{self.dest}" back to "{self.src}"]')
os.rename(self.dest, self.src)
class CreateFile:
def __init__(self, path, txt='hello world \n'):
self.path, self.txt = path, txt
def execute(self):
if verbose:
print(f'[creating file "{self.path}"]')
with open(self.path, mode='w', encoding='utf-8') as out_file:
out_file.write(self.txt)
def undo(self):
delete_file(self.path)
class ReadFile:
def __init__(self, path):
self.path = path
def execute(self):
if verbose:
print(f'[reading file "{self.path}"]')
with open(self.path, mode='r', encoding='utf-8') as in_file:
print(in_file.read(), end='')
def delete_file(path):
if verbose:
print(f'deleting file "{path}"')
os.remove(path)
def main():
origin_name, new_name = 'file1', 'file2'
commands = []
for cmd in CreateFile(origin_name), ReadFile(origin_name), RenameFile(origin_name, new_name):
commands.append(cmd)
for c in commands:
c.execute()
answer = input('reverse the executed commands? [y/n] ')
if answer not in 'yY':
print(f'the result is {new_name}')
exit()
for c in reversed(commands):
try:
c.undo()
except AttributeError as e:
pass
if __name__ == '__main__':
main()
- 解释器模式
# @Author : baili
# @File : interpreter
# @Time : 2023/2/25 15:15
from pyparsing import alphanums, Word, OneOrMore, Group, Suppress, Optional
class Boiler:
def __init__(self):
self.temperature = 83
def __str__(self):
return f'boiler temperature: {self.temperature}'
def increase_temperature(self, amount):
print(f"increasing the boiler's temperature by {amount} degrees")
self.temperature += amount
def decrease_temperature(self, amount):
print(f"deceasing the boiler's temperature by {amount} degrees")
def main():
word = Word(alphanums)
command = Group(OneOrMore(word))
token = Suppress('->')
device = Group(OneOrMore(word))
argument = Group(OneOrMore(word))
event = command + token + device + Optional(token + argument)
boiler = Boiler()
print(boiler)
cmd, dev, arg = event.parseString('increase -> boiler temperature -> 3 degrees')
if 'increase' in ''.join(cmd):
if 'boiler' in ''.join(dev):
boiler.increase_temperature(int(arg[0]))
print(boiler)
if __name__ == '__main__':
main()
- 观察者模式
# @Author : baili
# @File : observer
# @Time : 2023/2/25 15:26
class Publisher:
def __init__(self):
self.observers = []
def add(self, observer):
if observer not in self.observers:
self.observers.append(observer)
else:
print(f'Failed to add: {observer}')
def remove(self, observer):
try:
self.observers.remove(observer)
except ValueError as e:
print(f'Failed to remove: {observer}')
def notify(self):
for ob in self.observers:
ob.notify(self)
class DefaultFormatter(Publisher):
def __init__(self, name):
Publisher.__init__(self)
self.name = name
self._data = 0
def __str__(self):
return f'{type(self).__name__}: "{self.name}" has data = {self._data}'
@property
def data(self):
return self._data
@data.setter
def data(self, new_value):
try:
self._data = int(new_value)
except ValueError as e:
print(f'Error: {e}')
else:
self.notify()
class HexFormatter:
def notify(self, publisher):
print(f"{type(self).__name__}: {publisher.name} has now hex data = {hex(publisher.data)}")
class BinaryFormatter:
def notify(self, publisher):
print(f"{type(self).__name__}: {publisher.name} has now hex data = {hex(publisher.data)}")
def main():
df = DefaultFormatter('test1')
print(df)
print()
hf = HexFormatter()
df.add(hf)
df.data = 3
print(df)
print()
bf = BinaryFormatter()
df.add(bf)
df.data = 21
print(df)
print()
df.remove(hf)
df.data = 40
print(df)
print()
df.remove(hf)
df.add(bf)
df.data = 'hello'
print(df)
print()
df.data = 15.8
print(df)
if __name__ == '__main__':
main()
- 状态模式
# @Author : baili
# @File : state
# @Time : 2023/2/25 15:37
# pip install state_machine
from state_machine import State, Event, acts_as_state_machine, after, before, InvalidStateTransition
@acts_as_state_machine
class Process:
created = State(initial=True)
waiting = State()
running = State()
terminated = State()
blocked = State()
swapped_out_waiting = State()
swapped_out_blocked = State()
wait = Event(from_states=(created, running, blocked, swapped_out_waiting),
to_state=waiting)
run = Event(from_states=waiting, to_state=terminated)
terminate = Event(from_states=running, to_state=terminated)
block = Event(from_states=(running, swapped_out_blocked),
to_state=blocked)
swap_wait = Event(from_states=waiting, to_state=swapped_out_waiting)
swap_block = Event(from_states=blocked, to_state=swapped_out_blocked)
def __init__(self, name):
self.name = name
@after('wait')
def wait_info(self):
print(f'{self.name} entered waiting mode')
@after('run')
def run_info(self):
print(f'{self.name} is running')
@before('terminate')
def terminate_info(self):
print(f'{self.name} terminated')
@after('block')
def block_info(self):
print(f'{self.name} is blocked')
@after('swap_wait')
def swap_wait_info(self):
print(f'{self.name} is swapped out and waiting')
@after('swap_block')
def swap_block_info(self):
print(f'{self.name} is swapped out and blocked')
def transition(process, event, event_name):
try:
event()
except InvalidStateTransition as e:
print(f'Error: transition of {process.name} from {process.current_state} to {event_name} failed')
def state_info(process):
print(f'state of {process.name}: {process.current_state}')
def main():
RUNNING = 'running'
WAITING = 'waiting'
BLOCKED = 'blocked'
TERMINATED = 'terminated'
p1, p2 = Process('process1'), Process('process2')
for p in (p1, p2):
state_info(p)
transition(p1, p1.wait, WAITING)
transition(p2, p2.terminate, TERMINATED)
for p in (p1, p2):
state_info(p)
transition(p1, p1.run, RUNNING)
transition(p2, p2.wait, WAITING)
for p in (p1, p2):
state_info(p)
transition(p1, p1.run, RUNNING)
for p in (p1, p2):
state_info(p)
transition(p1, p1.block, BLOCKED)
for p in (p1, p2):
state_info(p)
transition(p1, p1.terminate, TERMINATED)
for p in (p1, p2):
state_info(p)
if __name__ == '__main__':
main()
- 策略模式
# @Author : baili
# @File : strategy
# @Time : 2023/2/25 17:28
import time
SLOW = 3
LIMIT = 5
WARNING = 'too bad, you picked the slow algorithm :('
def pairs(seq):
n = len(seq)
for i in range(n):
yield seq[i], seq[(i + 1) % n]
def allUniqueSort(s):
if len(s) > LIMIT:
print(WARNING)
srt_str = sorted(s)
for c1, c2 in pairs(srt_str):
if c1 == c2:
return False
return True
def allUniqueSet(s):
if len(s) < LIMIT:
print(WARNING)
time.sleep(SLOW)
return True if len(set(s)) == len(s) else False
def allUnique(s, strategy):
return strategy(s)
def main():
while True:
word = None
while not word:
word = input('Insert word (type quit to exit)> ')
if word == 'quit':
print('bye')
return
strategy_picked = None
strategies = {
'1': allUniqueSet,
'2': allUniqueSort
}
while strategy_picked not in strategies:
strategy_picked = input('Choose strategy: [1]Use a set, [2]Sort and pair >')
try:
strategy = strategies[strategy_picked]
print(f'allUnique({word}): {allUnique(word, strategy)}')
except KeyError as e:
print(f'Incorrect option: {strategy_picked}')
if __name__ == '__main__':
main()
- 模板模式
# @Author : baili
# @File : template
# @Time : 2023/2/25 17:41
# pip install cowpy
from cowpy import cow
def dots_style(msg: str):
msg = msg.capitalize()
msg = '.' * 10 + msg + '.' * 10
return msg
def admire_style(msg):
msg = msg.upper()
return '!'.join(msg)
def cow_style(msg):
msg = cow.milk_random_cow(msg)
return msg
def generate_banner(msg, style=dots_style):
print('-- start of banner --')
print(style(msg))
print('-- end of banner --')
def main():
msg = 'happy coding'
for style in dots_style, admire_style, cow_style:
generate_banner(msg, style)
if __name__ == '__main__':
main()