我在修改其中代码的时候没有考虑到多进程(multiprocessing)与多线程(threading)的区别,所以代码中有混用的情况,请注意这个情况,在此不做仔细区分。 下面多进程/多线程客户端的代码是相同的,可以多建几个文件只改变发送的视频文件,模拟多客户端。
先运行接收端(服务器),再运行发送端(客户端)。 ip地址要在同一网段下,及前三段数字相同,发送端被拒绝连接时要查看端口是否被占用。参考:Python socket传输MP4,MP3
import socket import os from time import * import sys def SendVideo(): address = ('172.23.13.226', 8105) # 192.168.1.103ubuntu try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(address) except socket.error as msg: print(msg) sys.exit(1) videopath = "../testvideo/video0_24.flv" video_size = os.path.getsize(videopath)/1024/1024 #字节换算为MB with open(videopath,'rb') as f: stringData = f.read() #读出来就是字节 start_time = time() sock.send(stringData) end_time = time() All_time = end_time - start_time rate = round(video_size/All_time,2) #计算速率,四舍五入保留两位 print("[*]发送时间(sec):%s"%(All_time)) print(f"[*]Rate: {rate}MB/s") print('[*]This path send over') sock.close() if __name__ == '__main__': SendVideo() import socket import time import os from multiprocessing import Process def ReceiveVideo(conn,addr): print('[*]connect from:' + str(addr)) start = time.time() videoname="%s.flv" % (time.strftime("%Y-%m-%d-%H_%M_%S",time.localtime(time.time()))) while True:#一次接收1024字节 持续发送 s=time.time() stringData=conn.recv(1024) if not stringData: break videopath="./receive_video/"+ videoname #% time.strftime("%Y-%m-%d-%H_%M_%S",time.localtime(time.time())) with open(videopath,'ab') as f: f.write(stringData) #===================== # e = time.time() # eachtime=e-s # print(eachtime) # each_size=os.path.getsize(videopath) # print(each_size) # rate = round(each_size / eachtime, 4) # print(print(f"[*]Rate: {rate}MB/s")) #======================= end = time.time() print('[*]process time (sec): ', end - start) video_size = os.path.getsize(videopath)# #单位是B(字节) print(f"[*]This File is {round(video_size/1024/1024,2)} MB") print('[*]This path send over') if __name__ == '__main__': address = ('172.23.13.226', 8105)#'192.168.1.104', 8004 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(address) s.listen(5) print(f"[*]Listening: {address}") while True: conn, addr = s.accept() p=Process(target=ReceiveVideo,args=(conn, addr)) #daemon默认值为False,如果设置为True,代表该进程为后台守护进程;当该进程的父进程终止时,该进程也随之终止;并且设置为True后,该进程不能创建子进程,设置该属性必须在start()之前 p.start() s.close()参考: Python3远程监控程序实现 基于python和opencv的视频传输程序(一)
import socket import struct import time import cv2 import numpy class Config(object): def __init__(self): self.TargetIP = ('172.23.13.216', 6666) self.resolution = (720, 480) # 分辨率720, 480 self.img_fps = 15 # each second send pictures self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server.connect(self.TargetIP) self.img = '' self.img_data = '' def RT_Image(self): camera = cv2.VideoCapture(0)#"../testvideo/video0_24.flv" img_param = [int(cv2.IMWRITE_JPEG_QUALITY), self.img_fps] while True: #time.sleep(0.1) # sleep for 0.1 seconds ret, self.img = camera.read() self.img = cv2.resize(self.img, self.resolution) _, img_encode = cv2.imencode('.jpg', self.img, img_param) img_code = numpy.array(img_encode) self.img_data = img_code.tostring() # bytes data try: packet = struct.pack(b'lhh', len(self.img_data), self.resolution[0], self.resolution[1]) self.server.send(packet) self.server.send(self.img_data) except Exception as e: print(e.args) camera.release() return if __name__ == '__main__': config = Config() config.RT_Image() import socket import cv2 import struct import numpy import threading import time class Camera_Connect_Object(object): def __init__(self, TargetIP=('', 6666)): self.TargetIP = TargetIP self.resolution = (720, 480) self.src = 888 + 15 self.interval = 0 self.img_fps = 15 self.Server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.Server.bind(self.TargetIP) self.Server.listen(5) def RT_Image(self): #=========================================================================================================== fourcc = cv2.VideoWriter_fourcc('X', 'V', 'I', 'D') # avi格式 videoWriter = cv2.VideoWriter('./%s.avi' % time.strftime("%Y-%m-%d-%H_%M_%S", time.localtime(time.time())), fourcc, 30, (720, 480)) # =========================================================================================================== self.client, self.addr = self.Server.accept() self.name = self.addr[0] + " Camera" print(self.name) while True: # time.sleep(0.3) # sleep for 0.3 seconds tempdata = self.client.recv(8) if len(tempdata) == 0: print("+1") continue info = struct.unpack('lhh', tempdata) buf_size = int(info[0]) if buf_size: try: self.buf = b"" self.temp_buf = self.buf while buf_size: self.temp_buf = self.client.recv(buf_size) buf_size -= len(self.temp_buf) self.buf += self.temp_buf data = numpy.fromstring(self.buf, dtype='uint8') self.image = cv2.imdecode(data, 1) # =========================================================================================================== videoWriter.write(self.image) # =========================================================================================================== cv2.imshow(self.name, self.image) except Exception as e: print(e.args) pass finally: if cv2.waitKey(10) == 27: self.client.close() cv2.destroyAllWindows() videoWriter.release() break def Get_data(self): showThread = threading.Thread(target=self.RT_Image) showThread.start() showThread.join() if __name__ == '__main__': camera = Camera_Connect_Object() camera.Get_data()在个我在自己windows电脑本地发送接收成功,但在windoes向ubuntu发送接收失败,ubuntu出现有关于“info = struct.unpack(‘lhh’, tempdata)”的错误提示。
参考 基于Python3 + OpenCV3.3.1的远程监控程序
#服务器端 import socket import threading import struct import time import cv2 import numpy class Carame_Accept_Object: def __init__(self,S_addr_port=("",8880)): self.resolution=(640,480) #分辨率 self.img_fps=15 #每秒传输多少帧数 self.addr_port=S_addr_port self.Set_Socket(self.addr_port) #设置套接字 def Set_Socket(self,S_addr_port): self.server=socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #端口可复用 self.server.bind(S_addr_port) self.server.listen(5) #print("the process work in the port:%d" % S_addr_port[1]) def check_option(object,client): #按格式解码,确定帧数和分辨率 info=struct.unpack('lhh',client.recv(8)) if info[0]>888: object.img_fps=int(info[0])-888 #获取帧数 object.resolution=list(object.resolution) # 获取分辨率 object.resolution[0]=info[1] object.resolution[1]=info[2] object.resolution = tuple(object.resolution) return 1 else: return 0 def RT_Image(object,client,D_addr): if(check_option(object,client)==0): return camera=cv2.VideoCapture(0) #从摄像头中获取视频 img_param=[int(cv2.IMWRITE_JPEG_QUALITY),object.img_fps] #设置传送图像格式、帧数 while(1): time.sleep(0.1) #推迟线程运行0.1s _,object.img=camera.read() #读取视频每一帧 object.img=cv2.resize(object.img,object.resolution) #按要求调整图像大小(resolution必须为元组) _,img_encode=cv2.imencode('.jpg',object.img,img_param) #按格式生成图片 img_code=numpy.array(img_encode) #转换成矩阵 object.img_data=img_code.tostring() #生成相应的字符串 try: #按照相应的格式进行打包发送图片 client.send(struct.pack("lhh",len(object.img_data),object.resolution[0],object.resolution[1])+object.img_data) except: camera.release() #释放资源 return if __name__ == '__main__': camera=Carame_Accept_Object() while(1): client,D_addr=camera.server.accept() clientThread=threading.Thread(None,target=RT_Image,args=(camera,client,D_addr,)) clientThread.start() #客户端 import socket import cv2 import threading import struct import numpy class Camera_Connect_Object: def __init__(self,D_addr_port=["",8880]): self.resolution=[640,480] self.addr_port=D_addr_port self.src=888+15 #双方确定传输帧数,(888)为校验值 self.interval=0 #图片播放时间间隔 self.img_fps=15 #每秒传输多少帧数 def Set_socket(self): self.client=socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.client.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) def Socket_Connect(self): self.Set_socket() self.client.connect(self.addr_port) print("IP is %s:%d" % (self.addr_port[0],self.addr_port[1])) def RT_Image(self): #按照格式打包发送帧数和分辨率 self.name=self.addr_port[0]+" Camera" self.client.send(struct.pack("lhh", self.src, self.resolution[0], self.resolution[1])) while(1): info=struct.unpack("lhh",self.client.recv(8)) buf_size=info[0] #获取读的图片总长度 if buf_size: try: self.buf=b"" #代表bytes类型 temp_buf=self.buf while(buf_size): #读取每一张图片的长度 temp_buf=self.client.recv(buf_size) buf_size-=len(temp_buf) self.buf+=temp_buf #获取图片 data = numpy.fromstring(self.buf, dtype='uint8') #按uint8转换为图像矩阵 self.image = cv2.imdecode(data, 1) #图像解码 cv2.imshow(self.name, self.image) #展示图片 except: pass; finally: if(cv2.waitKey(10)==27): #每10ms刷新一次图片,按‘ESC’(27)退出 self.client.close() cv2.destroyAllWindows() break def Get_Data(self,interval): showThread=threading.Thread(target=self.RT_Image) showThread.start() if __name__ == '__main__': camera=Camera_Connect_Object() camera.addr_port[0]=input("Please input IP:") camera.addr_port=tuple(camera.addr_port) camera.Socket_Connect() camera.Get_Data(camera.interval)