Alfred Workflow: 管理Synology Download Station

  Download StationSynology DSM里我最喜欢的应用之一,我也曾写过一个在终端控制它的脚本,Alfred的Workflow出现后我想到如果可能或许它是本地管理DS最方便的方式了,于是就有了这个,Download Station下载任务的创建、查看、暂停、恢复、删除等操作自然完全支持,也支持一些基本设置的管理如eMule、计划的启用与禁用等。

DS Main

DownloadSource


Read more...

命令行远程控制Synology DSM Download Station

  入手Synology DS211j近两年,所有下载工作都是通过其自带的Download Station来完成,毫无疑问,DS的功能已经足够强大,不过每次添加任务都要先登陆DSM,总让我觉得有点麻烦,一直想通过本地终端直接控制DS,但始终不得其法。

  直到这两天才发现其实从DSM 3.2开始DS就已经提供了官方API,看了下文档,接口如同常见的网络轮询服务,也不复杂,于是写了一段Python脚本,支持在终端查看、添加、暂停、恢复、清理下载任务以及禁用或启用eMule,基本能够满足日常应用了。

  代码结构很简单,如何使用直接看源代码吧,不多废话了。

#! /usr/bin/env python
# -*- coding: utf-8 -*-

## 
# 使用Synology Download Station API远程控制文件下载
# http://download.synology.com/download/other/Synology_Download_Station_Official_API_V3.pdf
# Created by JinnLynn 2013.01.15 http://jeeker.net 
##

import json, urllib, urllib2, os, sys
from argparse import ArgumentParser
from pprint import pprint
reload(sys)
sys.setdefaultencoding('utf8')

# 设置: 网址 用户 密码
NAS_URL = ''
NAS_USR = ''
NAS_PWD = ''

CommonCodeDesc = {  100: 'Unknown error',
                    101: 'Invalid parameter',
                    102: 'The requested API does not exist',
                    103: 'The requested method does not exist',
                    104: 'The requested version does not support the functionality',
                    105: 'The logged in session does not have permission',
                    106: 'Session timeout',
                    107: 'Session interrupted by duplicate login'
                }

AuthCodeDesc = {    400: 'No such account or incorrect password',
                    401: 'Guest account disabled',
                    402: 'Account disabled',
                    403: 'Wrong password',
                    404: 'Permission denied'
                }

DSTaskCodeDesc = {  400: 'File upload failed',
                    401: 'Max number of tasks reached',
                    402: 'Destination denied',
                    403: 'Destination does not exist',
                    404: 'Invalid task id',
                    405: 'Invalid task action'
                }

DS_VERSION = '0.1'

class DownloadStation:
    def __init__(self):
        self.sessionID = ''
        self.tryLogin()
        self.debugInfo = {}
        self.lastError = ''

    def post(self, cgipath, data, subdir=''):
        if not isinstance(data, dict):
            print('data error')
            return
        if not data.has_key('version'):
            data.update({'version': 1})
        if not data.has_key('_sid'):
            data.update({'_sid': self.sessionID})
        data = urllib.urlencode(data)
        url = os.path.join(NAS_URL, 'webapi', subdir, cgipath)
        try:
            requst = urllib2.urlopen(url, data)
            res = json.load(requst)
        except:
            res = {'error': {'code': -1},'success': False}
        return res

    def login(self):
        data = { 'api':     'SYNO.API.Auth',
                 'method':  'login',
                 'version': 2,
                 'account': NAS_USR,
                 'passwd':  NAS_PWD,
                 'session': 'DownloadStation',
        }
        res = self.post('auth.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        self.sessionID = res['data']['sid']

    def logout(self):
        data = { 'api':     'SYNO.API.Auth',
                 'method':  'logout',
                 'session': 'DownloadStation'
        }
        res = self.post('auth.cgi', data)
        self.sessionID = ''

    def tryLogin(self):
        if self.sessionID == '':
            self.login()

    def test(self):
        self.tryLogin()
        data = { 'api':     'SYNO.DownloadStation.Info',
                 'method':  'getinfo'
        }
        res = self.post('DownloadStation/info.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        manager_info = ', and you are manager.' if res['data']['is_manager'] else '.'
        info = 'everything is ok, the version of Download Station is {}{}'.format(res['data']['version_string'], manager_info)
        print(info)


    # 显示当前下载速度
    def showStatistic(self):
        self.tryLogin()
        data = { 'api':     'SYNO.DownloadStation.Statistic',
                 'method':  'getinfo'
        }
        res = self.post('statistic.cgi', data, 'DownloadStation')
        if not self.isSuccess(res):
            self.die( self.lastError )
        speed_download          = self.humanSize(res['data']['speed_download'])
        speed_upload            = self.humanSize(res['data']['speed_upload'])
        info = '{:15} download {:10} upload {:10}'.format('Speed:', speed_download, speed_upload)
        print(info)
        if res['data'].has_key('emule_speed_download'):
            emule_speed_download    = self.humanSize(res['data']['emule_speed_download'])
            emule_speed_upload      = self.humanSize(res['data']['emule_speed_upload'])
            info = '{:15} download {:10} upload {:10}'.format('eMule Speed:', emule_speed_download, emule_speed_upload)
            print(info)
        else:
            print('eMule disabled.')

    # 显示当前任务信息
    def showTask(self, simple = False, include_seeding = False):
        self.tryLogin()
        if not simple:
            self.showStatistic()
        data = { 'api':         'SYNO.DownloadStation.Task',
                 'method':      'list',
                 'additional':  'transfer'
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        if not include_seeding:
            res['data']['tasks'] = filter(lambda t: t['status'] != 'seeding', res['data']['tasks'])
        if len(res['data']['tasks']) == 0:
            print('Task list is empty.')
            return
        title = '{:10}{:15}{:10}{:15}{:15}{}'.format('Type', 'Status', 'Size', 'Downloaded', 'Speed', 'Title')
        if simple:
            title = '{:40}{}'.format('Task ID', 'Title')
        print(title)

        for task in res['data']['tasks']:
            task['size'] = self.humanSize(task['size']);
            task['add_size_d'] = self.humanSize(task['additional']['transfer']['size_downloaded'])
            task['add_sued'] = self.humanSize(task['additional']['transfer']['size_uploaded'])
            task['add_speed_d'] = self.humanSize(task['additional']['transfer']['speed_download'])
            task['add_su'] = self.humanSize(task['additional']['transfer']['speed_upload'])
            info = '{type:10}{status:15}{size:10}{add_size_d:15}{add_speed_d:15}{title:30}'.format(**task)
            if simple:
                info = '{id:40}{title}'.format(**task)
            print(info)

    # 创建新的下载任务 参数可以是本地文件或链接
    def createTask(self, link_or_file):
        self.tryLogin()
        if not isinstance(link_or_file, (str, unicode, list)):
            self.die('arguments error')
        uris = ','.join(link_or_file) if isinstance(link_or_file, list) else link_or_file
        try:
            if os.path.isfile(uris):
                # 使用file参数似乎有问题 自己读
                with open(uris, 'r') as fp:
                    uris = ','.join(map(lambda s: s.strip('\n \t\r'), fp.readlines()))
        except:
            pass
        data = { 'api':     'SYNO.DownloadStation.Task',
                 'method':  'create',
                 'uri':     uris
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        print('task created.')

    # 清理错误 及 已完成的任务
    #! eMule任务中 有seeding状态的无法删除
    def cleanTask(self):
        self.tryLogin()
        # 获取任务列表
        data = { 'api':         'SYNO.DownloadStation.Task',
                 'method':      'list',
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        need_clean = {}
        for task in res['data']['tasks']:
            # 错误 已完成 的清理
            if task['status'] == 'error' or task['status'] == 'finished':
                need_clean.update({task['id']: task['title']})
        if not len(need_clean):
            print('there is no task need to clean.')
            return
        data = { 'api':     'SYNO.DownloadStation.Task',
                 'method':  'delete',
                 'id':      ','.join(need_clean.keys())
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        for task in res['data']:
            clean_res = 'clean success:' if task['error'] == 0 else 'clean fail(%d):' % task['error']
            title = need_clean[task['id']] if need_clean.has_key(task['id']) else task['id']
            info = '{:20}{}'.format(clean_res, title)
            print(info)

    def pauseTask(self, task_id = ''):
        self.tryLogin()
        # 未自定id则暂停所有
        if not task_id:
            # 获取任务列表
            data = { 'api':         'SYNO.DownloadStation.Task',
                     'method':      'list',
            }
            res = self.post('DownloadStation/task.cgi', data)
            if not self.isSuccess(res):
                self.die( self.lastError )
            ids = []
            for task in res['data']['tasks']:
                # eMule做种的任务无法暂停
                if not task['status'] == 'seeding':
                    ids.append(task['id'])
            if not len(ids):
                self.die('there is no task can be paused.')
            task_id = ',' . join(ids)

        data = { 'api': 'SYNO.DownloadStation.Task',
                 'method': 'pause',
                 'id':      task_id
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        for task in res['data']:
            info = 'pause success:' if task['error'] == 0 else 'pause fail(%d):' % task['error']
            print( '{:20}{}'.format(info, task['id']) )

    def resumeTask(self, task_id = ''):
        self.tryLogin()
        # 未自定id则恢复所有
        if not task_id:
            # 获取任务列表
            data = { 'api':         'SYNO.DownloadStation.Task',
                     'method':      'list',
            }
            res = self.post('DownloadStation/task.cgi', data)
            if not self.isSuccess(res):
                self.die( self.lastError )
            ids = []
            for task in res['data']['tasks']:
                if task['status'] == 'paused':
                    ids.append(task['id'])
            if not len(ids):
                self.die('there is no task can be paused.')
            task_id = ',' . join(ids)
        data = { 'api':     'SYNO.DownloadStation.Task',
                 'method':  'resume',
                 'id':      task_id
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        for task in res['data']:
            info = 'resume success:' if task['error'] == 0 else 'resume fail(%d):' % task['error']
            print( '{:20}{}'.format(info, task['id']) )

    def deleteTask(self, task_id):
        self.tryLogin()
        # 获取待删除任务信息
        data = { 'api':     'SYNO.DownloadStation.Task',
                 'method':  'getinfo',
                 'id':      task_id
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        if not len(res['data']['tasks']):
            self.die('task is non-existent.')
        # 如果任务未完成 且 无错误 请求确认
        need_confirm = False
        for task in res['data']['tasks']:
            if task['status'] != 'error' and task['status'] != 'finished':
                need_confirm = True
                print( '{:20}{:40}{}'.format('task uncompleted:', task['id'], task['title']) )
        if need_confirm:
            if not raw_input('are you sure to delete it(yes/no)?:') == 'yes':
                return
        # 删除
        data = { 'api':     'SYNO.DownloadStation.Task',
                 'method':  'delete',
                 'id':      task_id
        }
        res = self.post('DownloadStation/task.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        print('task deleted.')

    def eMule(self, is_enable):
        data = { 'api':             'SYNO.DownloadStation.Info',
                 'method':          'setserverconfig',
                 'emule_enabled':   'false' if not is_enable else 'true'
        }
        res = self.post('DownloadStation/info.cgi', data)
        if not self.isSuccess(res):
            self.die( self.lastError )
        print('eMule was %s' % ('enabled' if is_enable else 'disabled'))

    def isSuccess(self, res):
        if not isinstance(res, dict) or not res.has_key('success'):
            self.lastError = 'something error'
            return False
        if res['success']:
            self.lastError = ''
            return True
        self.lastError = 'Fail. error code: %d' % res['error']['code'] if res.has_key('error') and res['error'].has_key('code') else 'something error'
        return False

    def humanSize(self, byte):
        if isinstance(byte, (str, unicode)):
            byte = int(byte) if byte.isnumeric() else 0
        size = byte / 1024.0
        unit = 'KB'
        if size > 1024:
            size = size / 1024.0
            unit = 'MB'
        if size > 1024:
            size = size / 1024.0
            unit = 'GB'
        return '{:.2f}{}'.format(size, unit)

    def die(self, msg=''):
        if len(msg):
            sys.stderr.write(msg + '\n')
        sys.exit(1)

def main():
    parser = ArgumentParser(prog = 'ds', description = 'Synology NAS Download Station Tool.')
    parser.add_argument('-v', '--version', action='version', version='%(prog)s ' + DS_VERSION)
    subparsers = parser.add_subparsers(title = 'subcommands', dest = 'sub_cmd')

    # 任务信息
    sub_info_parser = subparsers.add_parser('info', help = 'show task infomation')
    sub_info_parser.add_argument('--simple', action='store_true', default = False, help = 'only show task id and title')
    sub_info_parser.add_argument('--all', action='store_true', default = False, help = 'show all task')

    # 创建新任务
    sub_create_parser = subparsers.add_parser('create', help='create new task')
    sub_create_parser.add_argument('link_or_file', nargs = 1, help='single link or a file which include multi-links')

    # 清理任务
    sub_clean_parser = subparsers.add_parser('clean', help = 'clean error or completed task')

    # 暂停任务
    sub_pause_parser = subparsers.add_parser('pause', help = 'pause task')
    sub_pause_parser.add_argument('task_id', action='store', nargs='?', default = '')

    # 恢复任务
    sub_resume_parser = subparsers.add_parser('resume', help = 'resume task')
    sub_resume_parser.add_argument('task_id', nargs = '?', default = '')

    # 删除任务
    sub_delete_parser = subparsers.add_parser('delete', help = 'delete task')
    sub_delete_parser.add_argument('task_id')

    # eMule控制
    sub_emule_parser = subparsers.add_parser('emule', help = 'enable or disable eMule')
    sub_emule_parser.add_argument('operate', choices=['on', 'off'])

    # 测试
    sub_test_parser = subparsers.add_parser('test', help = 'test API')

    # 如果没有参数则默认输出info
    args = parser.parse_args(['info']) if len(sys.argv) == 1 else parser.parse_args()

    cmd_map = { 'info':     lambda: ds.showTask(args.simple, args.all),
                'create':   lambda: ds.createTask( args.link_or_file ),
                'clean':    lambda: ds.cleanTask(),
                'pause':    lambda: ds.pauseTask( args.task_id ),
                'resume':   lambda: ds.resumeTask( args.task_id ),
                'delete':   lambda: ds.deleteTask( args.task_id ),
                'emule':    lambda: ds.eMule( True if args.operate == 'on' else False),
                'test':     lambda: ds.test()
    }
    if cmd_map.has_key(args.sub_cmd):
        ds = DownloadStation()
        cmd_map[args.sub_cmd]()
        ds.logout()
    else:
        print('something error.')

if __name__ == '__main__':
    main()

Source on GitHub