当前位置:网站首页>telnet+ftp to control and upgrade the device

telnet+ftp to control and upgrade the device

2022-08-09 13:43:00 Hong Dayu

import telnetlib
import time
from ftplib import FTP
import serial
import configparser
import signal
import os
import tarfile

class TelnetClient():
    def __init__(self, host_ip, username, password):
        self.tn = telnetlib.Telnet()
        self.host_ip = host_ip
        self.username = username
        self.password = password

    def login_host(self):
        try:
            self.tn.open(self.host_ip, port=23)
        except:
            print('%s网络连接失败' % self.host_ip)
            return False
        self.tn.read_until(b'login: ', timeout=10)
        self.tn.write(self.username.encode('ascii') + b'\n')
        self.tn.read_until(b'Password: ', timeout=10)
        self.tn.write(self.password.encode('ascii') + b'\n')
        command_result = self.tn.read_very_eager().decode('ascii')
        if 'Login incorrect' not in command_result:
            print('%s登录成功' % self.host_ip)
            return True
        else:
            print('%s登录失败,用户名或密码错误' % self.host_ip)
            return False

    def execute_some_command(self, command,mode="nowait"):
        self.tn.write(command.encode('ascii')+b'\n')
        time.sleep(5)
        print("\n***************************************************\n")
        if mode == "nowait":
            command_result = self.tn.read_very_eager().decode('ascii')
        elif mode == "wait":
            command_result = self.tn.read_until("DONE".encode('ascii')).decode('ascii')
        print('命令执行结果:\n%s' % command_result)
        print("\n***************************************************\n")

    def __del__(self):
        self.tn.write(b"exit\n")


def TelnetExecCmd(tn: TelnetClient, cmd: str,mode="nowait") -> bool:
    if tn.login_host():
        tn.execute_some_command(cmd,mode)
        return True
    else:
        return False


class FtpClient():
    def __init__(self, ip: str, username: str, password: str) -> None:
        self.ip = ip
        self.user = username
        self.password = password
        self.ftp = FTP()

    def connect(self):
        try:
            self.ftp.set_debuglevel(2)
            self.ftp.connect(self.ip, port=21)
            self.ftp.login(self.user, self.password)
            print(self.ftp.getwelcome())
        except:
            print("ftp connect {} error ...".format(self.ip))

    def put(self, filename: str):
        try:
            fp = open(filename, "rb")
            self.ftp.storbinary("STOR "+filename, fp)
        except:
            print("ftp upload {} error".format(filename))

    def get(self, filename: str):
        try:
            fp = open(filename, "wb")
            self.ftp.retrbinary("RETR "+filename, fp.write)
        except:
            print("ftp get {} error".format(filename))

    def __del__(self):
        self.ftp.set_debuglevel(0)
        self.ftp.quit()


class SerialClient:
    def __init__(self, port: str, bps: int) -> None:
        try:
            self.port = port
            self.bps = bps
            self.ser = serial.Serial(port, bps)
        except:
            print("Open Serial Error {}".format(port))

    def write(self, cmd: str) -> bool:
        if self.ser.isOpen():
            print("Open Serial {}".format(self.port))
            cmd = cmd+" \n"
            self.ser.write(cmd.encode("utf8"))
            return True
        else:
            return False

    #如果终端有响应一直读取数据
    def read(self):
        if self.ser.isOpen():
            while self.ser.readable():
                line = self.ser.readline(4096)
                print(line.decode('ascii'))

    def __del__(self):
        self.ser.close()

# AU DEVICE: 192.168.1.2
# RRU DEVICE: 192.168.98.98

def dos2unix(dos: str, unix: str):
    content = ''
    outsize = 0
    with open(dos, 'rb') as infile:
      content = infile.read()
    with open(unix, 'wb') as output:
      for line in content.splitlines():
        outsize += len(line) + 1
        output.write(line + b'\n')
    print("Done. Stripped %s bytes." % (len(content)-outsize))


def Exit(signum, frame):
    exit()

def CompressFile(TarFile:str,File:str,module:str):
    tar = tarfile.open(TarFile,module)
    for root,dir,files in os.walk(File):
        for file in files:
            fullpath = os.path.join(root,file)
            tar.add(fullpath)
    tar.close()

if __name__ == '__main__':
    signal.signal(signal.SIGINT, Exit)
    Parser = configparser.ConfigParser()

    config = "config.ini"

    if os.access(config,os.F_OK) == False:
        print("Error:Can't find upgrade.ini, Please check your ini config")
        exit()

    Parser.read(config)
    Parser.sections()

    IP = Parser["BaseConfig"]["ServerIpAddr"]
    Scripts = Parser["BaseConfig"]["UpgradeScript"]
    UserName = Parser["UserConfig"]["UserName"]
    PassWord = Parser["UserConfig"]["UserPassword"]
    Filename = Parser["UserConfig"]["UpgradeFilename"]
    CompressedMode = Parser["BaseConfig"]["CompressedMode"]
    
    dos2unix(Scripts,Scripts)
    
    if os.access(Filename+".tar",os.F_OK):
        os.remove(Filename+".tar")

    CompressFile(Filename+".tar",Filename,CompressedMode)

    ftp = FtpClient(IP,UserName,PassWord)
    ftp.connect()
    ftp.put(Scripts)
    ftp.put(Filename+".tar")

    os.remove(Filename+".tar")
    exec_upgrade = TelnetClient(IP,UserName,PassWord)
    CMD = "chmod +rwx "+ "/mnt/flash/"+Scripts+" && /mnt/flash/"+Scripts
    TelnetExecCmd(exec_upgrade,CMD,"wait")
    TelnetExecCmd(exec_upgrade,"rm /mnt/flash/"+Scripts+" ; rm -rf /run/"+Filename+"*")
    os.system("pause")
原网站

版权声明
本文为[Hong Dayu]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/221/202208091236552783.html