香港云主机最佳企业级服务商!

ADSL拨号VPS包含了中国大陆(联通,移动,电信,)

中国香港,国外拨号VPS。

当前位置:云主机 > python >

电信ADSL拨号VPS
联通ADSL拨号VPS
移动ADSL拨号VPS

python实现的文件同步服务器实例


时间:2021-12-08 14:51 作者:admin


本文实例讲述了python/' target='_blank'>python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下:

服务端使用asyncore, 收到文件后保存到本地。

客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。

重点:

1. 使用structs打包发送文件的信息,服务端收到后,根据文件信息来接收客户端传送过来的文件。

2. 客户端使用多线程,pyinotify监视到文件变化,放到队列中,由另外一个线程发送。

上代码:

服务端:

# receive file from client and store them into file use asyncore.# #/usr/bin/Python #coding: utf-8 import asyncore import socket from socket import errno import logging import time import sys import struct import os import fcntl import threading from rrd_graph import MakeGraph try:   import rrdtool except (ImportError, ImportWarnning):   print "Hope this information can help you:"   print "Can not find pyinotify module in sys path, just run [apt-get install python-rrdtool] in ubuntu."   sys.exit(1) class RequestHandler(asyncore.dispatcher):   def __init__(self, sock, map=None, chunk_size=1024):     self.logger = logging.getLogger('%s-%s' % (self.__class__.__name__, str(sock.getsockname())))     self.chunk_size = chunk_size     asyncore.dispatcher.__init__(self,sock,map)     self.data_to_write = list()   def readable(self):     #self.logger.debug("readable() called.")     return True   def writable(self):     response = (not self.connected) or len(self.data_to_write)     #self.logger.debug('writable() -> %s data length -> %s' % (response, len(self.data_to_write)))     return response   def handle_write(self):     data = self.data_to_write.pop()     #self.logger.debug("handle_write()->%s size: %s",data.rstrip('\r\n'),len(data))     sent = self.send(data[:self.chunk_size])     if sent < len(data):       remaining = data[sent:]       self.data_to_write.append(remaining)   def handle_read(self):     self.writen_size = 0     nagios_perfdata = '../perfdata'     head_packet_format = "!LL128s128sL"     head_packet_size = struct.calcsize(head_packet_format)     data = self.recv(head_packet_size)     if not data:       return     filepath_len, filename_len, filepath,filename, filesize = struct.unpack(head_packet_format,data)     filepath = os.path.join(nagios_perfdata, filepath[:filepath_len])     filename = filename[:filename_len]     self.logger.debug("update file: %s" % filepath + '/' + filename)    try:       if not os.path.exists(filepath):         os.makedirs(filepath)     except OSError:       pass     self.fd = open(os.path.join(filepath,filename), 'w')     #self.fd = open(filename,'w')     if filesize > self.chunk_size:       times = filesize / self.chunk_size       first_part_size = times * self.chunk_size       second_part_size = filesize % self.chunk_size       while 1:         try:           data = self.recv(self.chunk_size)           #self.logger.debug("handle_read()->%s size.",len(data))         except socket.error,e:           if e.args[0] == errno.EWOULDBLOCK:             print "EWOULDBLOCK"             time.sleep(1)           else:             #self.logger.debug("Error happend while receive data: %s" % e)             break         else:           self.fd.write(data)           self.fd.flush()           self.writen_size += len(data)           if self.writen_size == first_part_size:             break       #receive the packet at last       while 1:         try:           data = self.recv(second_part_size)           #self.logger.debug("handle_read()->%s size.",len(data))         except socket.error,e:           if e.args[0] == errno.EWOULDBLOCK:             print "EWOULDBLOCK"             time.sleep(1)           else:             #self.logger.debug("Error happend while receive data: %s" % e)             break         else:           self.fd.write(data)           self.fd.flush()           self.writen_size += len(data)           if len(data) == second_part_size:             break     elif filesize <= self.chunk_size:       while 1:         try:           data = self.recv(filesize)           #self.logger.debug("handle_read()->%s size.",len(data))         except socket.error,e:           if e.args[0] == errno.EWOULDBLOCK:             print "EWOULDBLOCK"             time.sleep(1)           else:             #self.logger.debug("Error happend while receive data: %s" % e)             break         else:           self.fd.write(data)           self.fd.flush()           self.writen_size += len(data)           if len(data) == filesize:             break     self.logger.debug("File size: %s" % self.writen_size) class SyncServer(asyncore.dispatcher):   def __init__(self,host,port):     asyncore.dispatcher.__init__(self)     self.debug = True     self.logger = logging.getLogger(self.__class__.__name__)     self.create_socket(socket.AF_INET,socket.SOCK_STREAM)     self.set_reuse_addr()     self.bind((host,port))     self.listen(2000)   def handle_accept(self):     client_socket = self.accept()     if client_socket is None:       pass     else:       sock, addr = client_socket       #self.logger.debug("Incoming connection from %s" % repr(addr))       handler = RequestHandler(sock=sock) class RunServer(threading.Thread):   def __init__(self):     super(RunServer,self).__init__()     self.daemon = False   def run(self):     server = SyncServer('',9999)     asyncore.loop(use_poll=True) def StartServer():   logging.basicConfig(level=logging.DEBUG,             format='%(name)s: %(message)s',             )   RunServer().start()   #MakeGraph().start() if __name__ == '__main__':   StartServer()

客户端:

# monitor path with inotify(python module), and send them to remote server.# # use sendfile(2) instead of send function in socket, if we have python-sendfile installed.# import socket import time import os import sys import struct import threading import Queue try:    import pyinotify except (ImportError, ImportWarnning):    print "Hope this information can help you:"    print "Can not find pyinotify module in sys path, just run [apt-get install python-pyinotify] in ubuntu."    sys.exit(1) try:    from sendfile import sendfile except (ImportError,ImportWarnning):    pass filetype_filter = [".rrd",".xml"] def check_filetype(pathname):    for suffix_name in filetype_filter:      if pathname[-4:] == suffix_name:        return True    try:      end_string = pathname.rsplit('.')[-1:][0]      end_int = int(end_string)    except:      pass    else:      # means pathname endwith digit      return False class sync_file(threading.Thread):    def __init__(self, addr, events_queue):      super(sync_file,self).__init__()      self.daemon = False      self.queue = events_queue      self.addr = addr      self.chunk_size = 1024    def run(self):      while 1:        event = self.queue.get()        if check_filetype(event.pathname):          print time.asctime(),event.maskname, event.pathname          filepath = event.path.split('/')[-1:][0]          filename = event.name          filesize = os.stat(os.path.join(event.path, filename)).st_size          sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)          filepath_len = len(filepath)          filename_len = len(filename)          sock.connect(self.addr)          offset = 0          data = struct.pack("!LL128s128sL",filepath_len, filename_len, filepath,filename,filesize)          fd = open(event.pathname,'rb')          sock.sendall(data)          if "sendfile" in sys.modules:            # print "use sendfile(2)"            while 1:              sent = sendfile(sock.fileno(), fd.fileno(), offset, self.chunk_size)              if sent == 0:                break              offset += sent          else:            # print "use original send function"            while 1:              data = fd.read(self.chunk_size)              if not data: break              sock.send(data)          sock.close()          fd.close() class EventHandler(pyinotify.ProcessEvent):    def __init__(self, events_queue):      super(EventHandler,self).__init__()      self.events_queue = events_queue    def my_init(self):      pass    def process_IN_CLOSE_WRITE(self,event):      self.events_queue.put(event)    def process_IN_MOVED_TO(self,event):      self.events_queue.put(event) def start_notify(path, mask, sync_server):    events_queue = Queue.Queue()    sync_thread_pool = list()    for i in range(500):      sync_thread_pool.append(sync_file(sync_server, events_queue))    for i in sync_thread_pool:      i.start()    wm = pyinotify.WatchManager()    notifier = pyinotify.Notifier(wm,EventHandler(events_queue))    wdd = wm.add_watch(path,mask,rec=True)    notifier.loop() def do_notify():    perfdata_path = '/var/lib/pnp4nagios/perfdata'    mask = pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO    sync_server = ('127.0.0.1',9999)    start_notify(perfdata_path,mask,sync_server) if __name__ == '__main__':    do_notify()

python监视线程池

#!/usr/bin/python import threading import time class Monitor(threading.Thread):   def __init__(self, *args,**kwargs):     super(Monitor,self).__init__()     self.daemon = False     self.args = args     self.kwargs = kwargs     self.pool_list = []   def run(self):     print self.args     print self.kwargs     for name,value in self.kwargs.items():       obj = value[0]       temp = {}       temp[name] = obj       self.pool_list.append(temp)     while 1:       print self.pool_list       for name,value in self.kwargs.items():         obj = value[0]         parameters = value[1:]         died_threads = self.cal_died_thread(self.pool_list,name)        print "died_threads", died_threads         if died_threads >0:           for i in range(died_threads):             print "start %s thread..." % name             t = obj[0].__class__(*parameters)             t.start()             self.add_to_pool_list(t,name)         else:           break       time.sleep(0.5)   def cal_died_thread(self,pool_list,name):     i = 0     for item in self.pool_list:       for k,v in item.items():         if name == k:           lists = v     for t in lists:       if not t.isAlive():         self.remove_from_pool_list(t)         i +=1     return i   def add_to_pool_list(self,obj,name):     for item in self.pool_list:       for k,v in item.items():         if name == k:           v.append(obj)   def remove_from_pool_list(self, obj):     for item in self.pool_list:       for k,v in item.items():         try:           v.remove(obj)         except:           pass         else:           return

使用方法:

rrds_queue = Queue.Queue()   make_rrds_pool = []   for i in range(5):     make_rrds_pool.append(MakeRrds(rrds_queue))   for i in make_rrds_pool:     i.start()   make_graph_pool = []   for i in range(5):     make_graph_pool.append(MakeGraph(rrds_queue))   for i in make_graph_pool:     i.start()   monitor = Monitor(make_rrds_pool=(make_rrds_pool, rrds_queue), \            make_graph_pool=(make_graph_pool, rrds_queue))   monitor.start()

解析:

1. 接受字典参数,value为一个元组,第一个元素是线程池,后面的都是参数。
2. 每0.5秒监视线程池中的线程数量,如果线程死掉了,记录死掉线程的数目,再启动同样数量的线程。
3. 如果没有线程死去,则什么也不做。

从外部调用Django模块

import os import sys sys.path.insert(0,'/data/cloud_manage') from django.core.management import setup_environ import settings setup_environ(settings) from common.monitor import Monitor from django.db import connection, transaction

前提就是,要新建一个django的project,这里我们新建了一个cloud_manage.
这样不仅可以调用django自身的模块,还能调用project本身的东西。

希望本文所述对大家的Python程序设计有所帮助。

(责任编辑:admin)






帮助中心
会员注册
找回密码
新闻中心
快捷通道
域名登录面板
虚机登录面板
云主机登录面板
关于我们
关于我们
联系我们
联系方式

售前咨询:17830004266(重庆移动)

企业QQ:383546523

《中华人民共和国工业和信息化部》 编号:ICP备00012341号

Copyright © 2002 -2018 香港云主机 版权所有
声明:香港云主机品牌标志、品牌吉祥物均已注册商标,版权所有,窃用必究

云官方微信

在线客服

  • 企业QQ: 点击这里给我发消息
  • 技术支持:383546523

  • 公司总台电话:17830004266(重庆移动)
  • 售前咨询热线:17830004266(重庆移动)