socket_udp UDP协议的通信优势:允许一个服务器同时和多个客户端通信 服务端: import socket server = socket.socket(type=socket.SOCK_DGRAM) # udp协议 server.bind(('127.0.0.1', 12345)) # 收发 while 1: msg, addr =
socket_udp
UDP协议的通信优势:允许一个服务器同时和多个客户端通信
服务端:
import socketserver = socket.socket(type=socket.SOCK_DGRAM) # udp协议
server.bind(('127.0.0.1', 12345))
# 收发
while 1:
msg, addr = server.recvfrom(1024) # 接受来自哪里的消息,addr元组类型,是ip和端口
print(msg.decode('utf-8'))
data = input('>>>')
server.sendto(data.encode('utf-8'), addr) # 发送消息
客户端:
import socketclient = socket.socket(type=socket.SOCK_DGRAM)
while 1:
msg_s = input(">>>")
client.sendto(msg_s.encode('utf-8'), ('127.0.0.1', 12345))
msg, addr = client.recvfrom(1024)
print(msg.decode('utf-8'))
继成 socket 类
My_UDP.py
import socketclass MySocket(socket.socket): # 继承自 socket文件中的socket类,此时socket就是父类
def __init__(self, encoding='utf-8'):
self.encoding = encoding
super(MySocket, self).__init__(type=socket.SOCK_DGRAM) # 执行父类socket中的__init__方法
def my_sendto(self, msg, addr):
return self.sendto(msg.encode(self.encoding), addr) # 调用父类中的sendto方法
def my_recvfrom(self, num):
msg_r, addr = self.recvfrom(num) # 调用父类的recvfrom方法
return
服务端
from My_UDP import MySocketsk = MySocket()
sk.bind(('127.0.0.1', 8080))
msg, addr = sk.my_recvfrom(1024)
print(msg)
sk.close()
客户端
from My_UDP import MySocketsk = MySocket()
sk.my_sendto('abcabc中国', ('127.0.0.1', 8080))
sk.close()
上传小文件
服务端
import socketimport json
sk = socket.socket()
sk.bind(('127.0.0.1', 8080))
sk.listen()
conn, addr = sk.accept()
str_dic = conn.recv(9090).decode('utf-8')
dic = json.loads(str_dic)
# dic = {'opt':menu.get(num),'filename':filename,'content':content}
if dic['opt'] == 'upload':
# 上传
filename = '1' + dic['filename']
with open(filename, 'w', encoding='utf-8') as f:
f.write(dic['content'])
elif dic['opt'] == 'download':
# 下载
pass
conn.close()
sk.close()
客户端
import socketimport os
import json
sk = socket.socket()
sk.connect_ex(('127.0.0.1', 8080)) # 带返回值,如果出错,不会报错,会返回错误的编码
# sk.connect()# 会直接报错 两个都是连接服务器的功能
menu = {'1': 'upload', '2': 'download'}
for k, v in menu.items():
print(k, v)
num = input('请输入功能选项:')
if num == '1':
# 上传功能
# {你要执行的功能, 文件名, 文件内容}
dic = {'opt': menu.get(num), 'filename': None, 'content': None}
file_path = input('请输入一个文件的绝对路径>>>')
filename = os.path.basename(file_path)
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
dic['filename'] = filename
dic['content'] = content
str_dic = json.dumps(dic)
sk.send(str_dic.encode('utf-8'))
elif num == '2':
# 下载功能
pass
else:
print('错误')
sk.close()