diff --git a/test22.py b/test22.py new file mode 100644 index 0000000..9973dce --- /dev/null +++ b/test22.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习面向对象高级特性的@property' + +__author__ = 'sergiojune' + + +class Student(object): + def __init__(self, name, score): + self.__name = name + self.__score = score + + def get_name(self): + return self.__name + + def get_score(self): + return self.__score + + def set_name(self, name): + if not isinstance(name, str): + raise ValueError('请输入正确的名字') + self.__name = name + + def set_score(self, score): + if score < 0 or score> 100: + raise ValueError('请输入正确的成绩') + elif not isinstance(score, int): + raise ValueError('请输入正确的成绩') + else: + self.__score = score + + +stu = Student('bob', 86) +print(stu.get_name()) +# 这个就会报错 +# stu.set_score(999) + + +# 使用@property装饰器 +class People(object): + def __init__(self, name, age): + self.__name = name + self.__age = age + + @property # 添加装饰器,让这个方法变成一个属性 + def age(self): + return self.__age + + @property + def name(self): + return self.__name + + @name.setter # 这个前缀名字要和property装饰器的方法名字一致 + def name(self, name): + if not isinstance(name, str): + raise ValueError('请输入正确名字') + self.__name = name + + +p = People('bart', 20) +# 加了装饰器之后这样直接调用属性 +print(p.name) # 这个就是直接获取name属性 +p.name = 'bat' # 直接修改属性 +print(p.name) +print(p.age) +# 由于age只是只读,不予许写,所以这个会报错 +# p.age = 52 + + +# 作业:请利用@property给一个Screen对象加上width和height属性,以及一个只读属性resolution +class Screen(object): + def __init__(self): + self.__width = None + self.__height = None + + @property + def width(self): + return self.__width + + @property + def height(self): + return self.__height + + @width.setter + def width(self,width): + if width < 0 or width> 1000: + raise ValueError('请输入正确的宽') + self.__width = width + + @height.setter + def height(self, height): + if height < 0 or height> 1000: + raise ValueError('请输入正确的高') + self.__height = height + + @property + def resolution(self): + self.__resolution = (self.__height, self.__width) + return self.__resolution + + +screen = Screen() +screen.width = 25 +screen.height = 65 +print(screen.width) +print(screen.height) +print(screen.resolution) diff --git a/test23.py b/test23.py new file mode 100644 index 0000000..60895ad --- /dev/null +++ b/test23.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习面向对象高级特性的多继承' + +__author__ = 'sergiojune' + + +class Animal(object): + pass + + +class Bird(Animal): + pass + + +class Mammal(Animal): + pass + + +# 加个扩展功能 +class Flyable(object): + def fly(self): + print('fly------------') + + +class Runnable(object): + def run(self): + print('run-----------') + + +# 这样就是多继承,加了和功能 +class Dog(Mammal, Runnable): + pass + + +# 多继承从做往右继承,在父类中有相同的方法时就会先实现左边的方法,就近原则 +class Parrot(Bird, Flyable): + pass + diff --git a/test24.py b/test24.py new file mode 100644 index 0000000..84954b6 --- /dev/null +++ b/test24.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python 3 +# -*- coding: utf-8 -*- + +'练习面向对象的高级特性的定制类' + +__author__ = 'sergiojune' + + +class Fib(object): + # 在实例化时会调用该方法 + def __init__(self): + self.name = 'Fib' + self.a = 0 + self.b = 1 + + # 在调用len方法时实际调用的是该方法 + def __len__(self): + return 100 + + # 在打印实例信息时调用该方法 + def __str__(self): + return 'object name is %s' % (self.name) + + # 因为一般这两个方法代码一样,所以这样做 + __repr__ = __str__ + + # 这个方法让实例可迭代,返回的是一个可迭代对象 + def __iter__(self): + return self + + # 这个方法当实例用于for循环时会被调用,与list,tuple类似 + def __next__(self): + self.a, self.b = self.b, self.a + self.b + # 到达指定数目就返回一个异常 + if self.a> 100000: + raise StopIteration + return self.a + + # 该方法当实例在用下标取元素时被调用,传入切片也是调用这个函数 + # 该方法也可以像dict那样用键来获取值,判断参数是否是字符串即可 + def __getitem__(self, num): + # 对参数进行判断 + if isinstance(num, slice): + start = num.start + end = num.stop + step = num.step + # 开头可能不填 + start = start if start else 0 + # 步长也一样 + step = step if step else 0 + L = [] + if step == 0: + for x in range(start, end): + L.append(Fib()[x]) + else: + for x in range(start, end, step): + L.append(Fib()[x]) + return L + else: + n = 0 + for x in Fib(): + if n == num: + return x + n += 1 + + # 该方法当用下标设置对应值时被调用,也可以像dict一样的赋值 + def __setitem__(self, index, value): + # 和上面的差不多,就不实现了 + pass + + # __delitem__():方法是当删除某个值时就会被调用 + + +fib = Fib() +print(len(fib)) +# 打印实例 +print(Fib()) +# 打印有变量指向的实例信息 +print(fib) +# 用类实现斐波那契数列 +for x in fib: + print(x) +print('-----------') +print(fib[5]) +# 切片 +print(fib[1:5]) +# 有步长 +print(fib[5:1:-2]) + + +class Student(object): + def __init__(self, name): + self.name = name + + # 这个方法当实例方法不存在的属性时被调用 + def __getattr__(self, name): + if name == 'score': + return 99 + raise AttributeError('\'Student\' object has no attribute \'%s\'' % name) + + # 这个方法就是当实例当函数被调用时调用这个方法,还可以传参数 + def __call__(self): + print('call Student') + + +stu = Student('bart') +print(stu) +# 访问不存在的属性,加了__getattr__方法后就不会报错 +print(stu.score) +# print(stu.j) +# 将实例当函数调用 +stu() +# 判断对象是否可被调用 +print(callable(stu)) +print(callable([1])) +print(callable(fib)) + + +# 实现教程中的Chain().users('michael').repos 得到 GET /users/:user/repos +class Chain(): + def __init__(self, path='GET'): + self.path = path + + def __getattr__(self, path): + if path: + return Chain('%s/%s' % (self.path, path)) + + def __str__(self): + return self.path + + def __call__(self, user): + return Chain('%s/:%s' % (self.path, user)) + + +chain = Chain() +print(Chain().users('michael').repos) diff --git a/test25.py b/test25.py new file mode 100644 index 0000000..b3db837 --- /dev/null +++ b/test25.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python 3 +# -*- coding: utf-8 -*- + +'练习面向对象的高级特性的枚举' + +__author__ = 'sergiojune' +from enum import Enum, unique + + +# 使用枚举 +# +Month = Enum('Month', ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec')) +# 获取每一个成员,value是自动生成的,从 1 开始 +for month,member in Month.__members__.items(): + print(month, '=>', member, member.value) + + +# 可以继承枚举来实现枚举类 +@unique # 这个装饰器是让枚举的键固定值,并且不能相同 +class Weekday(Enum): + # 这样就实现了枚举类 + Sun = 0 + Mon = 1 + Tue = 2 + Wed = 3 + Thu = 4 + Fri = 5 + Sat = 6 + + +# 访问枚举类的值 +print(Weekday.Sun) +print(Weekday(1)) +print(Weekday['Wed']) +num =4 +# 这个访问值 +print(Weekday(4).value == num) +# 这个访问键 +print(Weekday(4) == num) +for day, member in Weekday.__members__.items(): + print(day, '=>', member) + + +# 作业:把Student的gender属性改造为枚举类型,可以避免使用字符串 +class Gender(Enum): + Male = 0 + Female = 1 + + +class Student(object): + def __init__(self, name, gender): + self.name = name + self.gender = gender + + +# 测试: +bart = Student('Bart', Gender.Male) +if bart.gender == Gender.Male: + print('测试通过!') +else: + print('测试失败!') diff --git a/test26.py b/test26.py new file mode 100644 index 0000000..7aad1ae --- /dev/null +++ b/test26.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习错误处理' + +__author__ = 'sergiojune' +import logging +from functools import reduce + + +def foo(n): + # 自定义抛出错误 + if n == 0: + raise ZeroDivisionError('被除数不能为零') + return 10 / n + + +print(foo(20)) +# 这个就会出错 +# print(foo(0)) + + +# 捕捉错误 +try: + n = input('请输入一个数字') + n = int(n) + res = 10 / n + print(res) +# 有多种异常情况,所以有多个except语句块 +except ZeroDivisionError as e: + print(e) +except ValueError as e: + # 用这个记录错误,开发中会发在日志上查看 + logging.exception(e) + +# 当没有发生错误时会执行 +else: + print('成功运行') +# 这个语句块是必须执行的 +finally: + print('代码执行完毕') + +print('end') + + +# 还可以自己创建异常 +# 只需要继承自某一个异常就可以了,不过一般不需要自定义异常 +class FooException(BaseException): + pass + + +# 作业:运行下面的代码,根据异常信息进行分析,定位出错误源头,并修复 +def str2num(s): + try: + return int(s) + except ValueError as e: + logging.exception(e) + print('捕捉成功') + try: + return float(s) + except ValueError as e: + print(e) + print('输入的内容不是数字') + + +def calc(exp): + try: + ss = exp.split('+') + ns = map(str2num, ss) + return reduce(lambda acc, x: acc + x, ns) + except TypeError as e: + print(e) + + +def main(): + r = calc('100 + 200 + 345') + print('100 + 200 + 345 =', r) + r = calc('99 + 88 + 7.6') + print('99 + 88 + 7.6 =', r) + + +main() +print('end') diff --git a/test27.py b/test27.py new file mode 100644 index 0000000..03823b1 --- /dev/null +++ b/test27.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习调试程序' + +__author__ = 'sergiojune' +import logging +import pdb +# 这个标明打印信息的类别,不标明不会打印具体的错误信息 +logging.basicConfig(level=logging.INFO) + + +def foo(n): + # if n == 0: + # # 这个麻烦,可以用断言,这样没有输出 + # print('n is zero !') + # else: + # return 10 / n + # 这个当条件为false时,就会执行后面语句,并抛出assertionError + assert n != 0, 'n is zero !' + return 10/n + + +foo(1) + +s = '0' +n = int(s) +# 开发中用这个较多 +logging.info('n = %d' % n) +print(10 / n) + +# 还可以用pdb进行调试 +s = '0' +n = int(s) +# 这个就是设置断点 +pdb.set_trace() +logging.info('n = %d' % n) +print(10 / n) diff --git a/test28.py b/test28.py new file mode 100644 index 0000000..abdd392 --- /dev/null +++ b/test28.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习读取文件' + +__author__ = 'sergiojune' + + +# 默认编码方式是gbk,而编译器是utf-8,所以需要指定编码方式 +f = open('./test27.py', 'r', encoding='utf-8') +# 读取文件全部内容,有一个参数为读取文件的大小 +# 当文件较小时可以这样读 +# txt = f. read() +# print(txt) + +# 还可以这样一行一行读 +print(f.readline()) +# 以一行行形式读全部内容 +for line in f.readlines(): + print(line) +# 最后记得关闭文件 +f.close() + + +# 当文件出现异常时,或许关闭不了文件,就需要捕捉异常 +try: + f = open('./test26.py', 'r', encoding='utf-8') + for x in f.readlines(): + print(x) +except IOError as e: + print(e) +finally: + # 最后一定关闭 + if f: + f.close() + + +# 如果每次都需要这样捕捉异常,就会很麻烦,python中可以用with语句块来处理 +with open('./test27.py', 'r', encoding='utf-8') as f: + # 当离开这个语句块就会自动关闭,就不需要我们来关闭 + txt = f.read() + print(txt) + + +# 作业:请将本地一个文本文件读为一个str并打印出来 +# 上面的就是了,参考上面的 diff --git a/test29.py b/test29.py new file mode 100644 index 0000000..06765f7 --- /dev/null +++ b/test29.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习读取StingIO和BytesIO' + +__author__ = 'sergiojune' +from io import StringIO, BytesIO + +# StingIO和BytesIO 和在内存中读取数据,不是在文件中读取数据 +s = StringIO('hello\nhi\ngood!\nheadsome boy') # 创建 +# 进行读取,与文件读取差不多 +for x in s.readlines(): + print(x) +# 也可以这样创建 +s = StringIO() +s.write('hello') +s.write(' ') +s.write('world') +print(s.getvalue()) + + +# 这时BytesIO的练习,bytesIO是写入字节码,而SrtingIO是写入str +b = BytesIO('中文'.encode('utf-8')) +# print(b.getvalue()) +print(b.read()) diff --git a/test30.py b/test30.py new file mode 100644 index 0000000..56c45da --- /dev/null +++ b/test30.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习操作文件和目录' + +__author__ = 'sergiojune' +import os +from datetime import datetime + +# 获取操作系统的类型 +print(os.name) +# window操作系统没有这个函数,在mac,linux下就会有 +# print(os.uname()) + +# 环境变量 +print(os.environ) +# 获取某个环境变量的值 +print(os.environ.get('path')) + +# 查看当前目录的绝对路径 +print(os.path.abspath('.')) # . 表示当前路径 + +# 添加目录 +name = os.path.join(os.path.abspath('.'), 'testtest') # 这步是解决不同系统目录名不一样的写法,是以目录形式合并两个参数 +print(name) +# 用上面的结果来添加目录 +# os.mkdir(name) +# 删除目录 +# os.rmdir(name) + +name = os.path.join(os.path.abspath('.'), 'test29.py') +print(name) +# 分割路径,会分出文件名和目录名 +l = os.path.split(name) +print(l) +# 也可以直接分出文件名的后缀 +t = os.path.splitext(name) +print(t) +# 重命名 +# os.rename('test.txt', 'test.md') +# 删除文件 +# os.remove('test.md') +# 找出目标路径是目录的名字 +d = [x for x in os.listdir(r'E:\anaconda\python_project') if not os.path.isdir(x)] +print(d) + + +# 作业 1 :利用os模块编写一个能实现dir -l输出的程序。 +print('%s%30s' % ('type', 'name')) +for x in os.listdir(r'E:\anaconda\python_project'): + # 判断是否是文件 + if os.path.isfile(x): + file = os.path.splitext(x)[1] + file = file.split('.')[1] + print('%s%30s' % (file+' file', x)) + else: + print('%s%30s' % ('folder', x)) + + +# 作业2:编写一个程序,能在当前目录以及当前目录的所有子目录下查找文件名包含指定字符串的文件,并打印出相对路径 +def find_dir(path, name, dirs=[]): + for x in os.listdir(path): + if os.path.isfile(path+'\\'+x): + # 文件中有指定字符串 + if name in x: + dirs.append(path+'\\'+x) + # 文件夹 + else: + if name in x: + dirs.append(path+'\\'+x) + # 递归 + find_dir(os.path.join(path, x), name) + return dirs + + +d = find_dir(r'E:\anaconda\python_project', 'py') +print(d) + +# 获取文件创建的时间 +print(os.path.getmtime(d[0])) +print(datetime.fromtimestamp(os.path.getmtime(d[0])).strftime('%Y-%m-%d %H:%M:%S') ) +# 获取文件大小 +print(os.path.getsize(d[0])//1024, 'KB') + diff --git a/test31.py b/test31.py new file mode 100644 index 0000000..6f93239 --- /dev/null +++ b/test31.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习序列化和反序列化' + +__author__ = 'sergiojune' +import pickle +import json +d = {'name': 'bob', 'age': 20, 'score': 80} +# 序列化,这个是序列化成bytes +p = pickle.dumps(d) +print(p) +# 这个序列化后直接存在指定文件 +with open('pickle.txt', 'wb') as f: + pickle.dump(d, f) + +# 反序列化 +# 这个是将bytes反序列化 +p = pickle.loads(p) +print(p) +# 这个是将文件反序列化 +with open('pickle.txt', 'rb') as f: + d = pickle.load(f) + print(d) + + +# 若要在个语言之间相互传递对象,这时就需要序列化成JSON或XML等格式 +# 这里序列化成JSON,用的是json库 +d = {'name': 'bart', 'age': 22, 'score': 76} +j = json.dumps(d) +print(j) +# 序列化后写入文件 +with open('json.txt', 'w') as f: + json.dump(d, f) +# 反序列化 +f = json.loads(j) +print(f) +# 从文件中反序列化 +with open('json.txt', 'r') as f: + w = json.load(f) + print('文件:', w) + + +# 序列化自定义类 +class Student(object): + def __init__(self, name, age, score): + self.name = name + self. age = age + self.score = score + + +s = Student('tom', 23, 68) +# 这个是获取对象的字典形式 +print(s.__dict__) +# 直接序列化会出错,抛TypeError: Object of type 'Student' is not JSON serializable,说该对象不可序列化 +# o = json.dumps(s) +# 对自定义对象序列化需要自己先将对象转换成字典才能序列化 + + +# 将student转换成字典 +def student2dict(obj): + return {'name': obj.name, + 'age': obj.age, + 'score': obj.score} + + +# 现在序列化 +o = json.dumps(s, default=student2dict) +print(o) +# 可以利用类的特性和匿名函数一行代码进行序列化 +o1 = json.dumps(s, default=lambda obj: obj.__dict__) +print('简易序列化', o1) + +# 反序列化 + + +# 在自定义类中,同样也需要加个函数将序列化后的字典转换成对象的对象 +def dict2student(d): + return Student(d['name'], d['age'], d['score']) + + +fan = json.loads(o, object_hook=dict2student) +# 这样就获取到student对象 +print(fan) + + +# 作业:对中文进行JSON序列化时,json.dumps()提供了一个ensure_ascii参数,观察该参数对结果的影响 +obj = dict(name='小明', age=20) +# 当ensure_ascii为True,会对中文进行编码 +s = json.dumps(obj, ensure_ascii=True) +print(s) +# 当ensure_ascii为False,不会对中文进行编码 +s = json.dumps(obj, ensure_ascii=False) +print(s) diff --git a/test32.py b/test32.py new file mode 100644 index 0000000..4792c79 --- /dev/null +++ b/test32.py @@ -0,0 +1,79 @@ +#!/ussr/bin/env python 3 +# -*- coding: utf-8 -*- + +'练习多进程与进程间通信' + +__author__ = 'sergiojune' +from multiprocessing import Process, Pool, Queue +import os +import time +from random import random + + + +# print('parent process(%s) is running' % os.getpid()) +# 定义子进程需要运行的函数 +# def run(name): +# print('I am runing,I is process(%s)' % os.getpid()) +# 在使用这个进程时,需要使用if __name__ == '__main__这个语句来开父进程,要不会出错 +# if __name__ == '__main__': +# # 创建子进程 +# p = Process(target=run, args=('test',)) +# # 开启进程 +# p.start() +# # 让子进程运行完再运行下面的父进程 +# p.join() +# print('End...........') + + + +# 使用进程池批量开启进程 +# def run(name): +# print('task %s is running, process %s' % (name, os.getpid())) +# start = time.time() +# time.sleep(random()*3) +# end = time.time() +# print('%s is run %0.2f seconds process %s' % (name, (end - start), os.getpid())) +# +# +# if __name__ == '__main__': # 开启多进程这个语句是必须的 +# print('parent process %s is running' % os.getpid()) +# # 创建进程池 +# p = Pool(4) +# for x in range(5): +# p.apply_async(run, args=(x,)) +# # 关闭进程池,不能再添加进程 +# p.close() +# # 要实现这个方法之前必须关闭进程池 +# p.join() +# print('parent process %s end' % os.getpid()) + + + +# 实现进程间通信 +def write(q): + print('process %s is writing' % os.getpid()) + for x in 'ABC': + q.put(x) + print(' %s write %s' % (os.getpid(), x)) + + +def read(q): + print('process %s is read' % os.getpid()) + while True: + value = q.get(True) + print('info is %s' % value) + + +if __name__ == '__main__': + q = Queue() + print('parent is running') + pw = Process(target=write, args=(q,)) + pr = Process(target=read, args=(q,)) + # 开启进程 + pw.start() + pr.start() + pw.join() + # 由于read方法不能自行停止,所以需要强制tingz + pr.terminate() + print('end------') diff --git a/test33.py b/test33.py new file mode 100644 index 0000000..8837229 --- /dev/null +++ b/test33.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习多线程' + +__author__ = 'sergiojune' +import threading +import multiprocessing + + +def run(): + print('thread %s is running' % threading.current_thread().name) + for x in range(5): + print('thread %s ==> %d' % (threading.current_thread().name, x)) + print('thread %s end' % threading.current_thread().name) + + +print('thraed %s is running' % threading.current_thread().name) +t = threading.Thread(target=run, name='loopthread') +# 开启线程 +t.start() +t.join() +print('thread %s is end' % threading.current_thread().name) + + +# 多线程的锁 +balance = 0 +def change(n): + global balance + balance = balance + n + balance = balance - n + print(balance) + + +def run_thread(): + l = threading.Lock() + for x in range(1, 100000): + try: + # 获取锁 + l.acquire() + change(x) + finally: + # 释放锁 + l.release() + + +t1 = threading.Thread(target=run_thread) +t2 = threading.Thread(target=run_thread()) +t1.start() +t2.start() +t1.join() +t2.join() +print('end') + + +def loop(): + x = 0 + while True: + x = x ^ 1 + + +for x in range(multiprocessing.cpu_count()): # 根据cpu的数量来开线程 + t = threading.Thread(target=loop) + t.start() + diff --git a/test35.py b/test35.py new file mode 100644 index 0000000..c208150 --- /dev/null +++ b/test35.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习分布式进程 ' + +__author__ = 'sergiojune' +import queue, random +from multiprocessing.managers import BaseManager +# 此文件用来发送和接受结果,test36.py用于处理结果 + +# 创建通信工具 +# 发送任务 +post_task = queue.Queue() +# 接受结果 +result_task = queue.Queue() + + +class QueueManager(BaseManager): + pass + + +# 定义的函数解决下面的坑 +def posttq(): + return post_task + + +def resulttq(): + return result_task + + +def start(): + # 注册任务 + # 这里有个坑,在window系统下callable不能为匿名函数,原因是不能被序列化,所以在这里我们需要定义函数 + QueueManager.register('post_task_queue', callable=posttq) # 第一个参数为注册名字 + QueueManager.register('result_task_queue', callable=resulttq) + + # 绑定窗口,设置验证码 + manager = QueueManager(address=('127.0.0.1', 500), authkey=b'abc') # 第一个参数为地址和端口,第二个参数为验证码,防止别人骚扰 + + # 启动管理 + manager.start() + # 通过管理器获取通信 + post = manager.post_task_queue() + result = manager.result_task_queue() + + # 进行发送数据 + print('try post data') + for x in range(10): + n = random.randint(1, 1000000) + print('put %d' % n) + post.put(n) + + # 接受结果 + print('try get result') + for x in range(10): + # timeout表示超时获取数的最长时间 + value = result.get(timeout=10) + print('get result', value) + + # 关闭管理器 + manager.shutdown() + print('master end') + + +if __name__ == '__main__': + start() diff --git a/test36.py b/test36.py new file mode 100644 index 0000000..843fabc --- /dev/null +++ b/test36.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'处理分布式进程发送的数据' + +__author__ = 'sergiojune' +from multiprocessing.managers import BaseManager +import time, queue + + +class QueueManager(BaseManager): + pass + + +# 注册到网络上 +QueueManager.register('post_task_queue') # 由于只是从网络上获取queue,所以不需要写callable方法 +QueueManager.register('result_task_queue') +# 连接到网络 +address = '127.0.0.1' # 这个是网络地址 +manager = QueueManager(address=(address, 500), authkey=b'abc') # 这些必须与发送的一致,要不会连不上 +# 连接 +manager.connect() +# 获取queue +post = manager.post_task_queue() +result = manager.result_task_queue() + +# 处理数据 +print('tyr get value') +for x in range(10): + try: + v = post.get(timeout=10) + print('get value %d' % v) + r = v*v + print('put value %d to result' % r) + time.sleep(1) + result.put(r) + except queue.Empty as e: + print('Queue is empty') +print('work exit') diff --git a/test37.py b/test37.py new file mode 100644 index 0000000..4940645 --- /dev/null +++ b/test37.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习正则表达式' + +__author__ = 'sergiojune' +import re + + +# 作业:尝试写一个验证Email地址的正则表达式。版本一应该可以验证出类似的Email +a = 'someone@gmail.com' +b = 'bill.gates@microsoft.com' +# 匹配邮箱的正则表达式,返回一个正则对象 +re_mail = re.compile('[\.\w]*@[\w]+.[\w]+') +m = re_mail.match(a) +print(m.group()) +g = re_mail.match(b) +print(g.group()) + + +# 版本二: +a = ' tom@voyager.org => Tom Paris' +b = 'bob@example.com => bob' +mail = re.compile('([\w]+|[\w\s]+)@[\w]+.[\w]+') +aa = mail.match(a) +bb = mail.match(b) +print(aa.group()) +print(bb.group()) diff --git a/test38.py b/test38.py new file mode 100644 index 0000000..61ce759 --- /dev/null +++ b/test38.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习内建模块致datetime' + +__author__ = 'sergiojune' +from datetime import datetime, timedelta, timezone +import re + +# 获取现在时间 +dt = datetime.now() +print(dt) +print(type(dt)) + +# 获取指定某日的时间,即是创建实例 +ddt = datetime(2017, 10, 15, 12, 35, 56) +print(ddt) # 类型为datetime.datetime + +# 将datetime转为timestamp +ts = dt.timestamp() +print(ts) +print(type(ts)) # 类型为float +# 将timestamp转为datetime +d = datetime.fromtimestamp(ts) +print(d) +# 将timestamp转为标准的utc时间,比北京时间少了八小时 +d = datetime.utcfromtimestamp(ts) +print(d) + +# 将字符串转为时间 +s = '2017:12:12 11:11:11' +sd = datetime.strptime(s, '%Y:%m:%d %H:%M:%S') # 第二个参数为字符串的时间格式 +print(sd) +print(type(sd)) + +# 将时间转为字符串,参数为转为字符串的格式 +ds = sd.strftime('%Y:%m:%d %H:%M:%S') +print(ds) +print(type(ds)) + + +# 将时间进行加减 +print('之前:', dt) +dt = dt + timedelta(hours=5,minutes=25) +print(dt) +dt = dt + timedelta(days=5) +print(dt) + +print('-------------------') +# 将本地datetime设一个时区 +tz_local = timezone(timedelta(hours=8)) +d = datetime.now() +print(d) +# 强行设置时区,tzinfo就是时区信息 +now = d.replace(tzinfo=tz_local) +print(now) + +print('----------------------') +# 时区时间任意转换 +# 拿到utc时间,并设置时区 +d = datetime.utcnow().replace(tzinfo=timezone.utc) +print(d) +# 转为北京时间的时区 +bj_utc = d.astimezone(tz=timezone(timedelta(hours=8))) +print(bj_utc) +# 转为东京时间 +dj_utc = d.astimezone(tz=timezone(timedelta(hours=9))) +print(dj_utc) +# 也可以直接利用北京时间转为东京时间 +dj_utc = bj_utc.astimezone(tz=timezone(timedelta(hours=9))) +print(dj_utc) +# 所以用astimezone()这个方法可以任意转换时区 + + +# 作业:假设你获取了用户输入的日期和时间如2015年1月21日 9:01:30,以及一个时区信息如UTC+5:00,均是str,请编写一个函数将其转换为timestamp +def to_timestamp(time, tz): + # 将字符串转为时间 + time = datetime.strptime(time, '%Y-%m-%d %H:%M:%S') + # 设置该时间的时区 + time = time.replace(tzinfo=timezone(timedelta(hours=tz))) + time = time.timestamp() + return time + + +print('---------------') +time1 = '2015-6-1 08:10:30' +utc1 = 'UTC+7:00' +# 用正则匹配出时间 +utc1 = int(re.search('UTC([+-][\d]{1,2}):[\d]{2}', utc1).group(1)) +print(utc1) +time = to_timestamp(time1, utc1) +print(time) + +time2 = '2015-5-31 16:10:30' +utc2 = 'UTC-09:00' +# 用正则匹配出时间 +utc2 = int(re.search('UTC([+-][\d]{1,2}):[\d]{2}', utc2).group(1)) +print(utc2) +time2 = to_timestamp(time2, utc2) +print(time2) diff --git a/test39.py b/test39.py new file mode 100644 index 0000000..1455bf0 --- /dev/null +++ b/test39.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习内建模块之collections' + +__author__ = 'sergiojune' +from collections import namedtuple, defaultdict, deque, OrderedDict, Counter + +# 弄一个可以根据名字来取数的元组 +at = namedtuple('Point', ['x', 'y']) # 第一个参数为描述事物类型, 第二个参数为元组的位置名字 +p = at(1, 2) # 新建一个元组 +# 根据名字来取元素 +print(p.x) # 这就可以看作是一个坐标点 +print(p.y) + +# 有默认值的dict +dd = defaultdict(lambda:'not') # 参数为遇到不存在键时的处理的方法 +dd['age'] = 20 +print(dd['age']) # 存在 +print(dd['a']) # 不存在 + + +# 使用队列,比list的插入数据和删除数据快,支持从头或者尾删除或插入 +d = deque([1, 2, 3, 6]) +print(d) +# 从头插入数据 +d.appendleft(5) +print(d) +# 删除头部数据 +d.popleft() +print(d) + +# 让字典保持有序 +od = OrderedDict([('x', 3), ('y', 6), ('z', 6)]) +print(od) +# 还可以添加,与dict用法一样 +ood = OrderedDict() +ood['f'] = 5 +ood['s'] = 9 +ood['e'] = 7 +print(ood) + + +# 用计数器,直接算出某一个字符串里面字符出现的个数 +s = 'mynameissergiojune' +c = Counter(s) +print(c) + + +# 利用OrderedDict实现一个先进先出的字典,超过最大容量的时候就删除 +class LastUpdateDict(OrderedDict): + def __init__(self, max): + super(LastUpdateDict, self).__init__() + self.max = max + + def __setitem__(self, key, value): + # 看看有没有重复键 + contains = 1 if key in self.keys() else 0 + # 判断最大长度 + if len(self) - contains>= self.max: + last = self.popitem(last=False) # last 为false时就删除第一个添加的键值对,否则删除最后的键值对 + print('pop', last) + # 增加元素 + if contains: # 键原来存在,直接修改 + del self[key] + print('update', key, value) + else: + print('add', key, value) + OrderedDict.__setitem__(self, key, value) + + +lud = LastUpdateDict(3) +lud['a'] = 1 +lud['b'] = 2 +lud['c'] = 3 +print(lud) +lud['d'] = 4 +print(lud) +lud['b'] = 5 +print(lud) diff --git a/test40.py b/test40.py new file mode 100644 index 0000000..1b03856 --- /dev/null +++ b/test40.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习内建模块之base64' + +__author__ = 'sergiojune' +import base64 + +# 进行base64编码,转为字符串 +b = b'binary\x00strg=' +bs = base64.b64encode(b) +print(bs) +# 解码 +b = base64.b64decode(bs) +print(b) + + +# 对于网页的安全编码 +s = b'i\xb7\x1d\xfb\xef\xff' +bs = base64.b64encode(s) +print(bs) +bs = base64.urlsafe_b64encode(s) +print(bs) + + +# 作业:请写一个能处理去掉=的base64解码函数 +def safe_base64_decode(s): + while len(s) % 4 !=0: + s += b'=' + bs = base64.b64decode(s) + return bs + + +# 测试: +assert b'abcd' == safe_base64_decode(b'YWJjZA=='), safe_base64_decode('YWJjZA==') +assert b'abcd' == safe_base64_decode(b'YWJjZA'), safe_base64_decode('YWJjZA') +print('ok') \ No newline at end of file diff --git a/test41.py b/test41.py new file mode 100644 index 0000000..a32a30a --- /dev/null +++ b/test41.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +# -*- conding: utf-8 -*- + +'练习内建模块之struct' + +__author__ = 'sergiojune' +import struct,base64 + +# 这个模块是将bytes与其他二进制数据互相转换 +# 将任意数据类型转为bytes +i = 10249999 +b = struct.pack('>I', i) # 第一个参数为处理指令 +print(b) + +s = 123.456 +b = struct.pack('f', s) +print(b) + + +# 将bytes转为其他任意类型 +s = struct.unpack('f', b) +print(s) + +s = b'\x42\x4d\x38\x8c\x0a\x00\x00\x00\x00\x00\x36\x00\x00\x00\x28\x00\x00\x00\x80\x02\x00\x00\x68\x01\x00\x00\x01\x00\x18\x00' +un = struct.unpack('' % self.name, end='') + + def put_end(self): + print('' % self.name) + + +@contextlib.contextmanager +def create(name): + h = Html(name) + h.put_start() + yield h + h.put_end() + + +with create('h1') as h: + print('我是标题', end='') + + +# 如果一个对象没有实现上下文方法,我们还可以用colsing这个方法 +with contextlib.closing(requests.get('https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001478651770626de401ff1c0d94f379774cabd842222ff000')) as f: + print(f.text) + + + diff --git a/test46.py b/test46.py new file mode 100644 index 0000000..3ef2473 --- /dev/null +++ b/test46.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +# -*- conding: utf-8 -*- + +'练习内建模块之HTMLParser' + +__author__ = 'sergiojune' +from html.parser import HTMLParser +import requests + + +class MyHTMLParser(HTMLParser): + + def handle_starttag(self, tag, attrs): # 这个是处理开始标签 + print('<%s>' % tag, list(attrs)) + + def handle_endtag(self, tag): # 这个是处理结束标签 + print('' % tag) + + def handle_data(self, data): # 这个是处理标签里的内容 + print(data) + + def handle_comment(self, data): # 这个是处理注释 + print('') + + def handle_entityref(self, name): # 这个是处理特殊字符,比如 + print('&%s;' % name) + + def handle_charref(self, name): # 这个是处理特殊字符,比如Ӓ + print('&#%s;' % name) + + +parser = MyHTMLParser() +parser.feed(''' + +
+ +

Some html HTML tutorial...
END

+

AltStyle によって変換されたページ (->オリジナル) /

''') + + +# 作业:找一个网页,例如https://www.python.org/events/python-events/,用浏览器查看源码并复制,然后尝试解析一下HTML,输出Python官网发布的会议时间、名称和地点。 +class DealHTML(HTMLParser): + def __init__(self): + super(DealHTML, self).__init__() + self.thing = 0 + self.time = 0 + self.address = 0 + + def handle_starttag(self, tag, attrs): + if len(attrs) == 1: + if 'python-events' in list(attrs)[0][1]: # 获取工作事件 + print('' % list(attrs)[0][1], end='') + self.thing = 1 + if 'datetime' in list(attrs)[0][0]: # 获取工作时间 + print('<%s>' % list(attrs)[0][0], end='') + self.time = 1 + if 'location' in list(attrs)[0][1]: # 获取工作地点 + print('<%s>' % list(attrs)[0][1], end='') + self.address = 1 + + def handle_data(self, data): + if self.thing: + print(data, end='') + if self.time: + print(data, end='') + if self.address: + print(data, end='') + + def handle_endtag(self, tag): + if self.thing: + print('' % tag) + self.thing = 0 + if self.time: + print('' % tag) + self.time = 0 + if self.address: + print('' % tag) + print('') + self.address = 0 + + +response = requests.get('https://www.python.org/events/python-events/').text +dh = DealHTML() +dh.feed(response) diff --git a/test47.py b/test47.py new file mode 100644 index 0000000..360161a --- /dev/null +++ b/test47.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +# -*- conding: utf-8 -*- + +'练习第三方模块之pillow' + +__author__ = 'sergiojune' +from PIL import Image, ImageFilter, ImageDraw, ImageFont +import random +# 对图像进行缩小 +im = Image.open('test.jpg') +w, h = im.size # 获取图片的宽高 +print('origin image width is %d, height is %d' % (w, h)) +# 进行缩小一倍 +im.thumbnail((w//2, h//2)) # 参数是一个元组,对应的是宽高 +im.save('suoxiao.png', 'png') +print('now width is %d, height is %d' % (im.size[0], im.size[1])) + +# 还可以对图像进行模糊化 +im2 = im.filter(ImageFilter.BLUR) +im2.save('muhu.png', 'png') + + +# 利用这个模块产生验证码 +def rndchar(): # 产生随机字符 + return chr(random.randint(65, 90)) + + +def rndcolor(): # 随机颜色 + return (random.randint(64, 255), random.randint(64, 255), random.randint(64, 255)) + + +def rndcolor2(): + return (random.randint(32, 127), random.randint(32, 127), random.randint(32, 127)) + + +width = 240 +height = 40 +# 创建图像对象 +im = Image.new('RGB', (width, height), (255, 255, 255)) +# 创建字体 +font = ImageFont.truetype(r'E:\python_project\Lib\site-packages\matplotlib\mpl-data\fonts\ttf\cmsy10.ttf', 36) +# 创建画笔 +draw = ImageDraw.Draw(im) +# 填充图片 +for x in range(width): + for y in range(height): + draw.point((x, y), fill=rndcolor2()) +# 写文字 +for x in range(4): + draw.text((60*x+10, 10), rndchar(), font=font, fill=rndcolor()) + +# mohu +im.filter(ImageFilter.BLUR) +# 保存图片 +im.save('yan.png', 'png') diff --git a/test48.py b/test48.py new file mode 100644 index 0000000..64ee6ab --- /dev/null +++ b/test48.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +# -*- conding: utf-8 -*- + +'练习第三方模块之chardet' + +__author__ = 'sergiojune' +import chardet # 这个库是用来猜测字节码的编码方式的 + +s = b'hello world' +c = chardet.detect(s) +print(c) +# 结果:{'encoding': 'ascii', 'confidence': 1.0, 'language': ''},可以看出是ascii编码,第二个为概率,1.0表示百分百 + +s = '中国中文我爱你' +c = chardet.detect(s.encode('gbk')) +print(c) + +c = chardet.detect(s.encode('utf-8')) +print(c) + +# 看看日语的 +s = '最新の主要ニュース' +c = chardet.detect(s.encode('euc-jp')) +print(c) + +# encode()为编码,将字符串变为字节码,decode()为解码,将字节码转为字符串 diff --git a/test49.py b/test49.py new file mode 100644 index 0000000..ec6e3a6 --- /dev/null +++ b/test49.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'用于练习网络编程TCP' + +__author__ = 'sergiojune' +import socket, threading, time + +# 创建socket +# s = socket.socket(socket.AF_INET , socket.SOCK_STREAM) # 第一个参数为指定ipv4模式, 第二个参数指定为tup模式的面向流 +# # 建立连接 +# s.connect(('www.sina.com.cn', 80)) +# # 发送请求 +# s.send(b'GET / HTTP/1.1\r\nHost: www.sina.com.cn\r\nConnection: close\r\n\r\n') # 发送get请求,http协议 +# # 处理返回来的数据 +# buffer = [] +# while True: +# # 指定最大接受1024个字节 +# d = s.recv(1024) +# if d: +# buffer.append(d) +# else: +# break +# data = b''.join(buffer) +# # 关闭连接 +# s.close() +# 取出请求头和内容 +# header, body =data.split(b'\r\n') +# print(header) +# print(body) + + +# 建立服务器端 +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +# 绑定端口和地址,当有信息从这个端口发送过来时就捕捉 +s.bind(('127.0.0.1', 9999)) # 这个为本机ip,这个服务器只能接受本地的 +# 监听窗口 +s.listen(5) # 传入参数为最大的等待连接数 +print('await for connect') +def tcplink(sock, add): + print('here is a connector from %s:%s' % add) + # 向客户端发送数据 + sock.send(b'Welcome') + # 处理客户端发送来的数据 + while True: + d = sock.recv(1024) + time.sleep(1) + if not d or d == b'exit': + break + sock.send(('hello %s !' % d.decode('utf-8')).encode('utf-8')) + # 关闭连接 + sock.close() + print('connect is closed') + +# 用永久循环来等待客户端连接 +while True: + # 这个函数会返回一个客户端连接 + sock, add = s.accept() + # 创建线程来处理客户端的数据 + t = threading.Thread(target=tcplink, args=(sock, add)) + # 开启线程 + t.start() diff --git a/test50.py b/test50.py new file mode 100644 index 0000000..721bcf3 --- /dev/null +++ b/test50.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'配合test49文件的服务器,这文件当作客户端' + +__author__ = 'sergiojune' +import socket + +# 建立socket +s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +# 连接地址 +s.connect(('127.0.0.1', 9999)) +# 接受返回的数据 +print(s.recv(1024).decode('utf-8')) +# 发送数据 +for x in [b'bob', b'amy', b'june']: + print('send %s' % x) + s.send(x) + print(s.recv(1024).decode('utf-8')) +# 发送退出数据 +s.send(b'exit') +s.close() +print('connect is closed from kehuuduan') diff --git a/test51.py b/test51.py new file mode 100644 index 0000000..81a0fac --- /dev/null +++ b/test51.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'练习UDP网络协议' + +__author__ = 'sergiojune' +import socket + +# 此文件用于当服务器 +# 创建socket +s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 第二个参数为指定udp协议 +# 绑定端口,不需要监听 +s.bind(('127.0.0.1', 9999)) +print('await connect') +while True: + data, addr = s.recvfrom(1024) # 这个直接返回客户端的ip和请求信息 + print('connect form %s:%s' % addr) + # 发送数据回客户端 + s.sendto(b'hello %s' % data, addr) # 第二个参数为发送到的ip diff --git a/test52.py b/test52.py new file mode 100644 index 0000000..54ea8f1 --- /dev/null +++ b/test52.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +'udp配合test51文件的服务器,这文件当作客户端' + +__author__ = 'sergiojune' +import socket + +# 创建socket +s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +# UDP协议不需要建立连接 +for x in [b'bret', b'tom', b'sergiojune']: + s.sendto(x, ('127.0.0.1', 9999)) + print(s.recv(1024).decode('utf-8')) +s.close() diff --git a/test53.py b/test53.py new file mode 100644 index 0000000..d7f37b8 --- /dev/null +++ b/test53.py @@ -0,0 +1,37 @@ +#!/usr.bin.env python 3 +# -*- coding: utf-8 -*- + +''' +练习python的协程0 +next(方法和next()方法在一定意义上是相同用法的,只不过next不能传递特定的值而next可以传递特定的值 +next() 和send(None) 方法是等价的,都是用于启动生成器返回值 +''' + +__author__ = 'sergiojune' + + +# 消费者 +def consumer(): + r = '' + while True: + n = yield r # 接受调用者发送的数据 + if not n: + return + print('consumer consume is %s' % n) + r = '200 OK' + + +def producer(c): # 这个函数先被执行 + c.send(None) # 这个语句必须写,而且参数固定,作用是启动上面的生成器 + x = 0 + while x < 5: + x = x+1 + print('producer is produce %d' % x) + r = c.send(x) # 发送数据,生成器接受数据,赋值给n变量 + print('consumer is return %s' % r) + # 关闭 + c.close() + + +c = consumer() +producer(c) diff --git a/test54.py b/test54.py new file mode 100644 index 0000000..5b29d01 --- /dev/null +++ b/test54.py @@ -0,0 +1,56 @@ +#!/usr.bin.env python 3 +# -*- coding: utf-8 -*- + +''' +练习python的asyncio +yield form 语句返回的值是后面表达式迭代后遇到StopIteration后再return(这个语句) 的值,无这个语句是返回None +''' + +__author__ = 'sergiojune' +import asyncio,threading + + +@asyncio.coroutine # 这个装饰器是把生成器标记成为coroutine +def hello(): + r = '' + print('hello world(%s)' % threading.current_thread()) + n = yield from asyncio.sleep(1) # 这个方法也是个coroutine,执行到这里后会等待该方法返回数给这个coroutine + print('hello again(%s)' % threading.current_thread()) + + +# 这是练习一个协程的 +# # 获取事件循环,就是eventloop +# loop = asyncio.get_event_loop() +# # 执行coroutine,参数是需要运行的协程 +# loop.run_until_complete(hello()) +# # 关闭 +# loop.close() + + +# 这个执行两个hello +# l = asyncio.get_event_loop() +# # 把协程放到eventloop里面 +# # 这两个方法是并发执行的 +# l.run_until_complete(asyncio.wait([hello(), hello()])) # wait()是将参数里的协程转为一个包括他们在内的单独协程 +# l.close() + + +# 练习用异步连接新浪搜狐和网易云 +def wget(host): + print('ready ro connect %s' % host) + connect = asyncio.open_connection(host, 80) + reader, writer = yield from connect # 这个io操作比较耗时,所以会执行下个协程 + header = 'GET/HTTP/1.1\r\nHost:%s\r\n\r\n' % host + writer.write(header.encode('utf-8')) + yield from writer.drain() # 用于刷新缓存区的内容,确保将内容提交上去 + while True: + line = yield from reader.readline() # 这个io操作有时不耗时,会直接运行整个循环 + if line == b'\r\n': + break + print('%s header>%s' % (host, line.decode('utf-8').strip())) + writer.close() + + +loop = asyncio.get_event_loop() +loop.run_until_complete(asyncio.wait([wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']])) +loop.close() diff --git a/test55.py b/test55.py new file mode 100644 index 0000000..af91a14 --- /dev/null +++ b/test55.py @@ -0,0 +1,52 @@ +#!/usr.bin.env python 3 +# -*- coding: utf-8 -*- + +''' +练习python的async/await +async:相当于asyncio.coroutine,可以用这个替换这个更加方便 +await:相当于yield form,把这个改成await即可替代 +以上都是在python3.5版本以上才有的 +''' + +__author__ = 'sergiojune' +import asyncio + + +async def hello(): # 这样就是相当于一个coroutine了 + print('hello world') + await asyncio.sleep(1) + print('hello again') + + +# loop = asyncio.get_event_loop() +# loop.run_until_complete(hello()) +# loop.close() + + +# 两个协程 +# loop = asyncio.get_event_loop() +# loop.run_until_complete(asyncio.wait([hello(), hello()])) +# loop.close() + + +# 再用这个方法连接搜狐新浪和网易 +async def wegt(host): + print('wegt %s ' % host) + # 连接 + connect = asyncio.open_connection(host, 80) + reader, writer = await connect + header = 'GET/HTTP/1.0\r\nHost:%s\r\n\r\n' % host + writer.write(header.encode('utf-8')) + await writer.drain() # 相当于刷新缓存 + while True: + line = await reader.readline() + if line == b'\r\n': + print() + break + print('%s header> %s' % (host, line.decode('utf-8').strip())) + connect.close() + + +loop = asyncio.get_event_loop() +loop.run_until_complete(asyncio.wait([wegt(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']])) +loop.close() diff --git a/test56.py b/test56.py new file mode 100644 index 0000000..f414767 --- /dev/null +++ b/test56.py @@ -0,0 +1,38 @@ +#!/usr.bin.env python 3 +# -*- coding: utf-8 -*- + +''' +练习python的aiohttp +练习服务器上的异步操作 +''' + +__author__ = 'sergiojune' +import asyncio +from aiohttp import web + + +async def index(request): # 首页返回一个h1标签 + await asyncio.sleep(0.5) + return web.Response(body=b'

index

', content_type='text/html') + + +async def hello(request): # 根据url参数返回信息 + await asyncio.sleep(0.5) + return web.Response(body=b'

hello %s

' % request.match_info['name'], content_type='text/html') + + +# 初始化服务器,也是一个coroutine +async def init(loop): # 接受协程池 + app = web.Application(loop=loop) + # 添加反应路径 + app.router.add_route('GET', '/', index) + app.router.add_route('GET', '/hello/{name}', hello) + s = loop.create_server(app.make_handler(), '', 80) # 用loop.create_server创建tcp协议 + print('sever is start in 127.0.0.1:80') + return s + + +loop = asyncio.get_event_loop() +loop.run_until_complete(init(loop)) +loop.run_forever() # 服务器永远运行 +