在Synology群晖NAS上使用shadowsocks-libev、polipo

  翻墙一直主要使用的是SSH动态端口映射,SSH代理数据传输的安全性自然没必要担心,但据说有较明显的流量特征,服务器较易暴露,再加上SSH代理在Windows下使用并不那么友好,因此在自己购买了VPS后就决定要用shadowsocks做代理服务器。

  Python版的shadowsocks因作者clowwindy被请喝茶早就停止更新,托开源的福,shadowsock有众多其它语言版本,比如纯C的shadowsocks-libev,较低的资源消耗更适合配置一般的VPS和我那内存仅区区128M的NAS。

  VPS操作系统是Ubuntu,因此服务端不论是自己编译还是从软件源安装都很简单,Windows和Mac OS X上也有客户端可以直接使用,唯一有点麻烦的就是群晖NAS。NAS配置低的惨不忍睹,而且缺少编译环境,只能在其他Linux服务器上进行交叉编译。关于交叉编译,Synology提供了详尽的开发指南可供参考。

一、环境准备

  1. 准备编译服务器,一般常见的Linux发行版都没有问题,但需要注意的是如果是32位的系统需要额外安装libc6-i386,我是在Paralels Desktop虚拟机里安装的Ubuntu 16.04 amd64。

  2. 确定NAS的CPU类型、DSM版本,下载对应的Tool Chains,如我的老爷机DS211j,CPU: Marvell mv6282 DSM: 6.0,需要下载的就是6281-gcc464_glibc215_88f6281-GPL.txz

  3. 上传Tool Chains到编译服务器。

  注意:下文脚本假定工作目录为/home/jinnlynn/nas,Tool Chains也被上传到该目录下

二、编译
# 工作目录,前文下载的6281-gcc464_glibc215_88f6281-GPL.txz在该目录下
WDIR=/home/jinnlynn/nas
# 目标目录,编译好的文件将会保存在这
DIST=$WDIR/dist

cd $WDIR

# 安装工具
sudo apt-get -y install make binutils
# 64位系统上需安装32位libc
uname -p | grep -q 64 && sudo apt -y install libc6-i386

# 解压Tool Chains,将生成arm-marvell-linux-gnueabi文件夹
tar xvf 6281-gcc464_glibc215_88f6281-GPL.txz

# 编译环境变量,不同的CPU环境可能不同,详见Synology开发指南的Compile Open Source Projects章节
export CC=$WDIR/arm-marvell-linux-gnueabi/bin/arm-marvell-linux-gnueabi-gcc
export LD=$WDIR/arm-marvell-linux-gnueabi/bin/arm-marvell-linux-gnueabi-ld
export RANLIB=$WDIR/arm-marvell-linux-gnueabi/bin/arm-marvell-linux-gnueabi-ranlib
export CFLAGS="-I$WDIR/arm-marvell-linux-gnueabi/arm-marvell-linux-gnueabi/libc/include"
export LDFLAGS="-L$WDIR/arm-marvell-linux-gnueabi/arm-marvell-linux-gnueabi/libc/lib"

# 依赖zlib,下载编译
curl -O http://zlib.net/zlib-1.2.8.tar.gz
tar xvf zlib-1.2.8.tar.gz
cd zlib-1.2.8/
./configure --prefix=$DIST/zlib-1.2.8
make install
cd $WDIR

# 依赖openssl,下载编译
curl -O https://www.openssl.org/source/openssl-1.0.2h.tar.gz
tar xvf openssl-1.0.2h.tar.gz
cd openssl-1.0.2h
./Configure dist --prefix=$DIST/openssl-1.0.2h
make
make install
cd $WDIR

# 编译shadowsocks-libev
curl -OL https://github.com/shadowsocks/shadowsocks-libev/archive/v2.4.6.tar.gz
tar xvf v2.4.6.tar.gz
cd shadowsocks-libev-2.4.6
# 配置 需要注意的是--host选项,目标NAS不同值可能也会不同
# 详见Synology开发指南的Compile Open Source Projects章节
./configure \
    --with-zlib=$DIST/zlib-1.2.8 \
    --with-openssl=$DIST/openssl-1.0.2h \
    --prefix=$DIST/shadowsocks-libev-2.4.6 \
    --host=armle-unknown-linux
make
make install
cd $WDIR

  将生成的shadowsocks-libev-2.4.6目录拷贝到NAS即可正常使用了。

三、编译polipo

  shadowsocks-libev是socks5代理,但某些应用可能只能使用HTTP代理,因此需要转换,软件个人推荐polipo,轻量高效,ipkg软件源上也有但版本较低,最好也自己编译。

# 工作目录
WDIR=/home/jinnlynn/nas
# 目标目录,编译好的文件将会保存在这
DIST=$WDIR/dist/

cd $WDIR

# 安装工具
sudo apt-get -y install texinfo

# 编译环境变量,不同的CPU环境可能不同,详见Synology开发指南的Compile Open Source Projects章节
export CC=$WDIR/arm-marvell-linux-gnueabi/bin/arm-marvell-linux-gnueabi-gcc
export LD=$WDIR/arm-marvell-linux-gnueabi/bin/arm-marvell-linux-gnueabi-ld
export RANLIB=$WDIR/arm-marvell-linux-gnueabi/bin/arm-marvell-linux-gnueabi-ranlib
export CFLAGS="-I$WDIR/arm-marvell-linux-gnueabi/arm-marvell-linux-gnueabi/libc/include"
export LDFLAGS="-L$WDIR/arm-marvell-linux-gnueabi/arm-marvell-linux-gnueabi/libc/lib"

curl -OL https://github.com/jech/polipo/archive/polipo-1.1.1.tar.gz
tar xvf polipo-1.1.1.tar.gz
cd polipo-polipo-1.1.1/

make all

# polipo的make install默认将安装到当前系统目录
# 这里自己拷贝出所需文件
PREFIX=$DIST/polipo-1.1.1
mkdir -p $PREFIX/bin
mkdir -p $PREFIX/share/www/doc
rm -f $PREFIX/bin/polipo
cp -f polipo $PREFIX/bin/
cp -f html/* $PREFIX/share/www/doc
cp -f localindex.html $PREFIX/share/www/index.html
mkdir -p $PREFIX/man/man1
mkdir -p $PREFIX/info
cp -f polipo.man $PREFIX/man/man1/polipo.1
cp polipo.info $PREFIX/info/

  同样的拷贝polipo-1.1.1目录到NAS即可,要注意的一点是运行时可能出现Disabling disk cache: No such file or directoryDisabling local tree: No such file or directory提示,这是因为编译默认的缓存目录和本地文档目录在NAS上不一定存在,只要运行时给polipo添加localDocumentRootdiskCacheRoot选项,设置正确的目录就没有问题了。

Read more...

Alfred Workflow: 管理Synology Download Station

  Download StationSynology DSM里我最喜欢的应用之一,我也曾写过一个在终端控制它的脚本,Alfred的Workflow出现后我想到如果可能或许它是本地管理DS最方便的方式了,于是就有了这个,Download Station下载任务的创建、查看、暂停、恢复、删除等操作自然完全支持,也支持一些基本设置的管理如eMule、计划的启用与禁用等。

DS Main

DownloadSource


Read more...

Synology群晖NAS外部IP更新通知

  在我的群晖DS211j上使用DDNS服务时经常出现长时间无法正常更新IP的现象1,这对于需要经常从外部访问的我来说是个很大的问题,于是写了段Python2脚本在NAS上监控外部IP地址的变化,如果发现改变将发送新的IP地址到指定的邮箱。3

1. 脚本

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#! 强制默认编码为utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf8')

# 配置
# SMTP服务器 用户 密码
smtp_server = 'SMTP SERVER'
smtp_port   = 25
smtp_usr    = 'SMTP USERNAME'
smtp_pwd    = 'SMTP PASSWORD'

# 发送接受邮箱地址
#! 发送邮箱需是smtp_usr有权操作的邮箱
from_addr   = 'FROM EMAIL ADDRESS'
to_addr     = 'TO EMAIL ADDRESS'

# 获取外网IP的网址 可以是
# http://ifconfig.me/ip
# http://ip.3322.net
# http://members.3322.org/dyndns/getip
ip_check_server = 'http://ip.3322.net'

# 记录文件路径
log_file    = '/tmp/ipcheck.log'

# 邮件主题 
# 内容为新的IP
mail_subject = 'IP check message'

import os, urllib2, smtplib
from datetime import datetime

class CheckIP(object):
    def __init__(self):
        self.logs = []
        self.openLog()

    def __del__(self):
        self.saveLog()

    def openLog(self):
        if not os.path.isfile(log_file):
            open(log_file, 'w').close()
        with open(log_file, 'r') as f:
            self.logs = f.readlines()
        if not self.logs or len(self.logs) < 2:
            self.logs = ['\n', '----\n']
        # 确保第二行永远是分隔符
        self.logs[1] = '----\n'

    def saveLog(self):
        with open(log_file, 'w') as f:
            f.writelines(self.logs)

    def getOldIP(self):
        return self.logs[0].strip()

    def setNewIP(self, new_ip):
        if new_ip:
            self.logs[0] = '{}\n'.format(new_ip)

    def sendMail(self, msg):
        if not msg:
            return
        data = {'from_addr' : from_addr,
                'to_addr'   : to_addr,
                'subject'   : mail_subject,
                'msg'       : msg }
        msg = 'From: {from_addr}\r\nTo: {to_addr}\r\nSubject: {subject}\r\n\r\n{msg}'.format(**data)
        try:
            smtp = smtplib.SMTP()
            # smtp.set_debuglevel(1)
            smtp.connect(smtp_server, smtp_port)
            smtp.login(smtp_usr, smtp_pwd)
            smtp.sendmail(from_addr, to_addr, msg)   
            smtp.quit()
        except Exception, e:
            return False, 'send mail fail. {}'.format(e)
        return True, None

    def log(self, msg, need_mail = False):
        if not msg:
            return
        print(msg)
        log_msg = '{}\t{}\n'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), msg)
        # 检查最近的记录,如果相同仅更新时间
        try:
            last_log = self.logs[2].split('\t')[1].strip()
            if last_log == msg:
                self.logs[2] = log_msg
            else:
                self.logs.insert(2, log_msg)
        except Exception, e:
            self.logs.insert(2, log_msg)

        if need_mail:
            self.sendMail(msg)

    def checkIP(self):
        old_ip = self.getOldIP()
        try:
            url = urllib2.Request(ip_check_server)
            res = urllib2.urlopen(url)
            new_ip = res.read().strip('\r\n ')
        except Exception, e:
            self.log('get ip fail. {}'.format(e), True)
            return
        if old_ip == new_ip:
            return self.log('IP unchanged.')
        res, msg = self.sendMail('IP Changed. {}'.format(new_ip))
        if not res:
            return self.log(msg)
        self.setNewIP(new_ip)
        self.log('IP changed. {}'.format(new_ip))

    def cleanLog(self):
        self.logs[2:] = []

    def run(self):
        if len(sys.argv) == 1:
            return self.checkIP()
        if '--clean' in sys.argv:
            return self.cleanLog()
        print('argument error.')

if __name__ == '__main__':
    ip = CheckIP()
    ip.run()

Source on gist

  • 将上述代码另存为文本,并按脚本中的注释正确设置SMTP服务器、账户、密码、发送及接收邮箱地址
  • 将文件上传至NAS,如/root/ipcheck.py
  • 使用root账户登陆NAS将该文件权限设置为可执行chmod 744 /root/ipcheck.py
  • 执行/root/ipcheck.py测试,正常情况下你设置的接收邮箱将会收到一个包含NAS当前IP地址的邮件,如果没有请检查前面的操作是否正确。

2. 设定定时任务

  用root账户执行vi /etc/crontab4,在打开的文件末尾添加如下内容:

*/10        *        *        *        *        root        /root/ipcheck.py

  其中*/10意为每10分钟检查一次外部IP。

  最后执行命令重启cron

/usr/syno/etc.defaults/rc.d/S04crond.sh stop && sleep 1 && /usr/syno/etc.defaults/rc.d/S04crond.sh start

  至此,NAS将每隔一段时间检查外部IP,一旦发现地址变化就会发送邮件通知。

3. 其它

  • /tmp/ipcheck.txt为默认保存当前ip地址信息及日志的文件,每次检查ip地址时都跟此文件内容比较,如果不同则发送通知邮件

REF:

  向StartSSL申请个人域名SSL证书


  1. 貌似这是群晖DDNS服务的通病,在DSM4.2已经允许设置多个DDNS服务,不过为以防万一,自行监控IP变化也不失为一个好的预防手段。 

  2. 群晖NAS DSM上默认不自带Python,需要从套件中心中安装。 

  3. 不仅仅只适用于群晖NAS,只要安装了Python的*nix系统均可正常使用。 

  4. 简单的vi操作: i进入编辑模式 esc 退出当前模式 : 进入命令行输入模式 w 保存内容 q 退出。 

向StartSSL申请个人域名SSL证书

  为方便在外时访问,NAS使用了DDNS服务并绑定了jeeker.net的一个子域名,同时启用了HTTPS连接以增强安全性,于是由于证书问题,每次访问NAS总出现烦人的警告。

Chrome SSL Warning

  解决这个问题的唯一方法当然就是安装一个有效的SSL证书,但向CA机构申请证书一年少者几十多者数千美元,很不划算,好在互联网上总不缺免费午餐StartSSL就是很好的选择,下面就以它为例了解下SSL证书的申请和安装过程。

Read more...

命令行远程控制Synology DSM Download Station

  入手Synology DS211j近两年,所有下载工作都是通过其自带的Download Station来完成,毫无疑问,DS的功能已经足够强大,不过每次添加任务都要先登陆DSM,总让我觉得有点麻烦,一直想通过本地终端直接控制DS,但始终不得其法。

  直到这两天才发现其实从DSM 3.2开始DS就已经提供了官方API,看了下文档,接口如同常见的网络轮询服务,也不复杂,于是写了一段Python脚本,支持在终端查看、添加、暂停、恢复、清理下载任务以及禁用或启用eMule,基本能够满足日常应用了。

  代码结构很简单,如何使用直接看源代码吧,不多废话了。

#! /usr/bin/env python
# -*- coding: utf-8 -*-

## 
# 使用Synology Download Station API远程控制文件下载
# http://download.synology.com/download/other/Synology_Download_Station_Official_API_V3.pdf
# Created by JinnLynn 2013.01.15 http://jeeker.net 
##

import json, urllib, urllib2, os, sys
from argparse import ArgumentParser
from pprint import pprint
reload(sys)
sys.setdefaultencoding('utf8')

# 设置: 网址 用户 密码
NAS_URL = ''
NAS_USR = ''
NAS_PWD = ''

CommonCodeDesc = {  100: 'Unknown error',
                    101: 'Invalid parameter',
                    102: 'The requested API does not exist',
                    103: 'The requested method does not exist',
                    104: 'The requested version does not support the functionality',
                    105: 'The logged in session does not have permission',
                    106: 'Session timeout',
                    107: 'Session interrupted by duplicate login'
                }

AuthCodeDesc = {    400: 'No such account or incorrect password',
                    401: 'Guest account disabled',
                    402: 'Account disabled',
                    403: 'Wrong password',
                    404: 'Permission denied'
                }

DSTaskCodeDesc = {  400: 'File upload failed',
                    401: 'Max number of tasks reached',
                    402: 'Destination denied',
                    403: 'Destination does not exist',
                    404: 'Invalid task id',
                    405: 'Invalid task action'
                }

DS_VERSION = '0.1'

class DownloadStation:
    def __init__(self):
        self.sessionID = ''
        self.tryLogin()
        self.debugInfo = {}
        self.lastError = ''

    def post(self, cgipath, data, subdir=''):
        if not isinstance(data, dict):
            print('data error')
            return
        if not data.has_key('version'):
            data.update({'version': 1})
        if not data.has_key('_sid'):
            data.update({'_sid': self.sessionID})
        data = urllib.urlencode(data)
        url = os.path.join(NAS_URL, 'webapi', subdir, cgipath)
        try:
            requst = urllib2.urlopen(url, data)
            res = json.load(requst)
        except:
            res = {'error': {'code': -1},'success': False}
        return res

    def login(self):
        data = { 'api':     'SYNO.API.Auth',
                 'method':  'login',
                 'version': 2,
                 'account': NAS_USR,
                 'passwd':  NAS_PWD,
                 'session': 'DownloadStation',
        }
        res = self.post('auth.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        self.sessionID = res['data']['sid']

    def logout(self):
        data = { 'api':     'SYNO.API.Auth',
                 'method':  'logout',
                 'session': 'DownloadStation'
        }
        res = self.post('auth.cgi', data)
        self.sessionID = ''

    def tryLogin(self):
        if self.sessionID == '':
            self.login()

    def test(self):
        self.tryLogin()
        data = { 'api':     'SYNO.DownloadStation.Info',
                 'method':  'getinfo'
        }
        res = self.post('DownloadStation/info.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        manager_info = ', and you are manager.' if res['data']['is_manager'] else '.'
        info = 'everything is ok, the version of Download Station is {}{}'.format(res['data']['version_string'], manager_info)
        print(info)


    # 显示当前下载速度
    def showStatistic(self):
        self.tryLogin()
        data = { 'api':     'SYNO.DownloadStation.Statistic',
                 'method':  'getinfo'
        }
        res = self.post('statistic.cgi', data, 'DownloadStation')
        if not self.isSuccess(res):
            self.die( self.lastError )
        speed_download          = self.humanSize(res['data']['speed_download'])
        speed_upload            = self.humanSize(res['data']['speed_upload'])
        info = '{:15} download {:10} upload {:10}'.format('Speed:', speed_download, speed_upload)
        print(info)
        if res['data'].has_key('emule_speed_download'):
            emule_speed_download    = self.humanSize(res['data']['emule_speed_download'])
            emule_speed_upload      = self.humanSize(res['data']['emule_speed_upload'])
            info = '{:15} download {:10} upload {:10}'.format('eMule Speed:', emule_speed_download, emule_speed_upload)
            print(info)
        else:
            print('eMule disabled.')

    # 显示当前任务信息
    def showTask(self, simple = False, include_seeding = False):
        self.tryLogin()
        if not simple:
            self.showStatistic()
        data = { 'api':         'SYNO.DownloadStation.Task',
                 'method':      'list',
                 'additional':  'transfer'
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        if not include_seeding:
            res['data']['tasks'] = filter(lambda t: t['status'] != 'seeding', res['data']['tasks'])
        if len(res['data']['tasks']) == 0:
            print('Task list is empty.')
            return
        title = '{:10}{:15}{:10}{:15}{:15}{}'.format('Type', 'Status', 'Size', 'Downloaded', 'Speed', 'Title')
        if simple:
            title = '{:40}{}'.format('Task ID', 'Title')
        print(title)

        for task in res['data']['tasks']:
            task['size'] = self.humanSize(task['size']);
            task['add_size_d'] = self.humanSize(task['additional']['transfer']['size_downloaded'])
            task['add_sued'] = self.humanSize(task['additional']['transfer']['size_uploaded'])
            task['add_speed_d'] = self.humanSize(task['additional']['transfer']['speed_download'])
            task['add_su'] = self.humanSize(task['additional']['transfer']['speed_upload'])
            info = '{type:10}{status:15}{size:10}{add_size_d:15}{add_speed_d:15}{title:30}'.format(**task)
            if simple:
                info = '{id:40}{title}'.format(**task)
            print(info)

    # 创建新的下载任务 参数可以是本地文件或链接
    def createTask(self, link_or_file):
        self.tryLogin()
        if not isinstance(link_or_file, (str, unicode, list)):
            self.die('arguments error')
        uris = ','.join(link_or_file) if isinstance(link_or_file, list) else link_or_file
        try:
            if os.path.isfile(uris):
                # 使用file参数似乎有问题 自己读
                with open(uris, 'r') as fp:
                    uris = ','.join(map(lambda s: s.strip('\n \t\r'), fp.readlines()))
        except:
            pass
        data = { 'api':     'SYNO.DownloadStation.Task',
                 'method':  'create',
                 'uri':     uris
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        print('task created.')

    # 清理错误 及 已完成的任务
    #! eMule任务中 有seeding状态的无法删除
    def cleanTask(self):
        self.tryLogin()
        # 获取任务列表
        data = { 'api':         'SYNO.DownloadStation.Task',
                 'method':      'list',
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        need_clean = {}
        for task in res['data']['tasks']:
            # 错误 已完成 的清理
            if task['status'] == 'error' or task['status'] == 'finished':
                need_clean.update({task['id']: task['title']})
        if not len(need_clean):
            print('there is no task need to clean.')
            return
        data = { 'api':     'SYNO.DownloadStation.Task',
                 'method':  'delete',
                 'id':      ','.join(need_clean.keys())
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        for task in res['data']:
            clean_res = 'clean success:' if task['error'] == 0 else 'clean fail(%d):' % task['error']
            title = need_clean[task['id']] if need_clean.has_key(task['id']) else task['id']
            info = '{:20}{}'.format(clean_res, title)
            print(info)

    def pauseTask(self, task_id = ''):
        self.tryLogin()
        # 未自定id则暂停所有
        if not task_id:
            # 获取任务列表
            data = { 'api':         'SYNO.DownloadStation.Task',
                     'method':      'list',
            }
            res = self.post('DownloadStation/task.cgi', data)
            if not self.isSuccess(res):
                self.die( self.lastError )
            ids = []
            for task in res['data']['tasks']:
                # eMule做种的任务无法暂停
                if not task['status'] == 'seeding':
                    ids.append(task['id'])
            if not len(ids):
                self.die('there is no task can be paused.')
            task_id = ',' . join(ids)

        data = { 'api': 'SYNO.DownloadStation.Task',
                 'method': 'pause',
                 'id':      task_id
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        for task in res['data']:
            info = 'pause success:' if task['error'] == 0 else 'pause fail(%d):' % task['error']
            print( '{:20}{}'.format(info, task['id']) )

    def resumeTask(self, task_id = ''):
        self.tryLogin()
        # 未自定id则恢复所有
        if not task_id:
            # 获取任务列表
            data = { 'api':         'SYNO.DownloadStation.Task',
                     'method':      'list',
            }
            res = self.post('DownloadStation/task.cgi', data)
            if not self.isSuccess(res):
                self.die( self.lastError )
            ids = []
            for task in res['data']['tasks']:
                if task['status'] == 'paused':
                    ids.append(task['id'])
            if not len(ids):
                self.die('there is no task can be paused.')
            task_id = ',' . join(ids)
        data = { 'api':     'SYNO.DownloadStation.Task',
                 'method':  'resume',
                 'id':      task_id
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        for task in res['data']:
            info = 'resume success:' if task['error'] == 0 else 'resume fail(%d):' % task['error']
            print( '{:20}{}'.format(info, task['id']) )

    def deleteTask(self, task_id):
        self.tryLogin()
        # 获取待删除任务信息
        data = { 'api':     'SYNO.DownloadStation.Task',
                 'method':  'getinfo',
                 'id':      task_id
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        if not len(res['data']['tasks']):
            self.die('task is non-existent.')
        # 如果任务未完成 且 无错误 请求确认
        need_confirm = False
        for task in res['data']['tasks']:
            if task['status'] != 'error' and task['status'] != 'finished':
                need_confirm = True
                print( '{:20}{:40}{}'.format('task uncompleted:', task['id'], task['title']) )
        if need_confirm:
            if not raw_input('are you sure to delete it(yes/no)?:') == 'yes':
                return
        # 删除
        data = { 'api':     'SYNO.DownloadStation.Task',
                 'method':  'delete',
                 'id':      task_id
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        print('task deleted.')

    def eMule(self, is_enable):
        data = { 'api':             'SYNO.DownloadStation.Info',
                 'method':          'setserverconfig',
                 'emule_enabled':   'false' if not is_enable else 'true'
        }
        res = self.post('DownloadStation/info.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        print('eMule was %s' % ('enabled' if is_enable else 'disabled'))

    def isSuccess(self, res):
        if not isinstance(res, dict) or not res.has_key('success'):
            self.lastError = 'something error'
            return False
        if res['success']:
            self.lastError = ''
            return True
        self.lastError = 'Fail. error code: %d' % res['error']['code'] if res.has_key('error') and res['error'].has_key('code') else 'something error'
        return False

    def humanSize(self, byte):
        if isinstance(byte, (str, unicode)):
            byte = int(byte) if byte.isnumeric() else 0
        size = byte / 1024.0
        unit = 'KB'
        if size > 1024:
            size = size / 1024.0
            unit = 'MB'
        if size > 1024:
            size = size / 1024.0
            unit = 'GB'
        return '{:.2f}{}'.format(size, unit)

    def die(self, msg=''):
        if len(msg):
            sys.stderr.write(msg + '\n')
        sys.exit(1)

def main():
    parser = ArgumentParser(prog = 'ds', description = 'Synology NAS Download Station Tool.')
    parser.add_argument('-v', '--version', action='version', version='%(prog)s ' + DS_VERSION)
    subparsers = parser.add_subparsers(title = 'subcommands', dest = 'sub_cmd')

    # 任务信息
    sub_info_parser = subparsers.add_parser('info', help = 'show task infomation')
    sub_info_parser.add_argument('--simple', action='store_true', default = False, help = 'only show task id and title')
    sub_info_parser.add_argument('--all', action='store_true', default = False, help = 'show all task')

    # 创建新任务
    sub_create_parser = subparsers.add_parser('create', help='create new task')
    sub_create_parser.add_argument('link_or_file', nargs = 1, help='single link or a file which include multi-links')

    # 清理任务
    sub_clean_parser = subparsers.add_parser('clean', help = 'clean error or completed task')

    # 暂停任务
    sub_pause_parser = subparsers.add_parser('pause', help = 'pause task')
    sub_pause_parser.add_argument('task_id', action='store', nargs='?', default = '')

    # 恢复任务
    sub_resume_parser = subparsers.add_parser('resume', help = 'resume task')
    sub_resume_parser.add_argument('task_id', nargs = '?', default = '')

    # 删除任务
    sub_delete_parser = subparsers.add_parser('delete', help = 'delete task')
    sub_delete_parser.add_argument('task_id')

    # eMule控制
    sub_emule_parser = subparsers.add_parser('emule', help = 'enable or disable eMule')
    sub_emule_parser.add_argument('operate', choices=['on', 'off'])

    # 测试
    sub_test_parser = subparsers.add_parser('test', help = 'test API')

    # 如果没有参数则默认输出info
    args = parser.parse_args(['info']) if len(sys.argv) == 1 else parser.parse_args()

    cmd_map = { 'info':     lambda: ds.showTask(args.simple, args.all),
                'create':   lambda: ds.createTask( args.link_or_file ),
                'clean':    lambda: ds.cleanTask(),
                'pause':    lambda: ds.pauseTask( args.task_id ),
                'resume':   lambda: ds.resumeTask( args.task_id ),
                'delete':   lambda: ds.deleteTask( args.task_id ),
                'emule':    lambda: ds.eMule( True if args.operate == 'on' else False),
                'test':     lambda: ds.test()
    }
    if cmd_map.has_key(args.sub_cmd):
        ds = DownloadStation()
        cmd_map[args.sub_cmd]()
        ds.logout()
    else:
        print('something error.')

if __name__ == '__main__':
    main()

Source on GitHub