#coding: UTF-8
"""
Test file/dir operations.
"""

import posixpath
import pytest
import urllib.request, urllib.parse, urllib.error
from urllib.parse import urlencode, quote
import urllib.parse
from nose.tools import assert_in

from tests.common.utils import randstring, urljoin
from tests.api.apitestbase import ApiTestBase
from tests.api.urls import REPOS_URL

class FilesApiTest(ApiTestBase):
    def test_rename_file(self):
        with self.get_tmp_repo() as repo:
            name, furl = self.create_file(repo)
            data = {
                'operation': 'rename',
                'newname': name + randstring(),
            }
            res = self.post(furl, data=data)
            self.assertRegex(res.text, r'"http(.*)"')

    def test_remove_file(self):
        with self.get_tmp_repo() as repo:
            _, furl = self.create_file(repo)
            res = self.delete(furl)
            self.assertEqual(res.text, '"success"')

    def test_move_file(self):
        with self.get_tmp_repo() as repo:
            # TODO: create another repo here, and use it as dst_repo

            # create sub folder(dpath)
            dpath, _ = self.create_dir(repo)

            # create tmp file in sub folder(dpath)
            tmp_file = 'tmp_file.txt'
            file_path = dpath + '/' + tmp_file
            furl = repo.get_filepath_url(file_path)
            data = {'operation': 'create'}
            res = self.post(furl, data=data, expected=201)

            # copy tmp file from sub folder(dpath) to dst dir('/')
            data = {
                'dst_repo': repo.repo_id,
                'dst_dir': '/',
                'operation': 'copy',
            }
            u = urllib.parse.urlparse(furl)
            parsed_furl = urllib.parse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
            res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
            assert_in(tmp_file, res.text)

            # get info of copied file in dst dir('/')
            fdurl = repo.file_url + 'detail/?p=/%s' % quote(tmp_file)
            detail = self.get(fdurl).json()
            self.assertIsNotNone(detail)
            self.assertIsNotNone(detail['id'])

            # copy tmp file from sub folder(dpath) to dst dir('/') again
            # for test can rename file if a file with the same name is dst dir
            data = {
                'dst_repo': repo.repo_id,
                'dst_dir': '/',
                'operation': 'copy',
            }
            u = urllib.parse.urlparse(furl)
            parsed_furl = urllib.parse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
            res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
            assert_in('tmp_file (1).txt', res.text)

            # copy tmp file from sub folder(dpath) to dst dir('/') again
            # for test can rename file if a file with the same name is dst dir
            data = {
                'dst_repo': repo.repo_id,
                'dst_dir': '/',
                'operation': 'copy',
            }
            u = urllib.parse.urlparse(furl)
            parsed_furl = urllib.parse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
            res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
            assert_in('tmp_file (2).txt', res.text)

            # then move file to dst dir
            data = {
                'dst_repo': repo.repo_id,
                'dst_dir': '/',
                'operation': 'move',
            }
            u = urllib.parse.urlparse(furl)
            parsed_furl = urllib.parse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
            res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
            assert_in('tmp_file%20%283%29.txt', res.text)


    def test_copy_file(self):
        with self.get_tmp_repo() as repo:
            # TODO: create another repo here, and use it as dst_repo

            # create sub folder(dpath)
            dpath, _ = self.create_dir(repo)

            # create tmp file in sub folder(dpath)
            tmp_file = 'tmp_file.txt'
            file_path = dpath + '/' + tmp_file
            furl = repo.get_filepath_url(file_path)
            data = {'operation': 'create'}
            res = self.post(furl, data=data, expected=201)

            # copy tmp file from sub folder(dpath) to dst dir('/')
            data = {
                'dst_repo': repo.repo_id,
                'dst_dir': '/',
                'operation': 'copy',
            }
            u = urllib.parse.urlparse(furl)
            parsed_furl = urllib.parse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
            res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
            assert_in(tmp_file, res.text)

            # get info of copied file in dst dir('/')
            fdurl = repo.file_url + 'detail/?p=/%s' % quote(tmp_file)
            detail = self.get(fdurl).json()
            self.assertIsNotNone(detail)
            self.assertIsNotNone(detail['id'])

            # copy tmp file from sub folder(dpath) to dst dir('/') again
            # for test can rename file if a file with the same name is dst dir
            data = {
                'dst_repo': repo.repo_id,
                'dst_dir': '/',
                'operation': 'copy',
            }
            u = urllib.parse.urlparse(furl)
            parsed_furl = urllib.parse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
            res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
            assert_in('tmp_file (1).txt', res.text)

            # copy tmp file from sub folder(dpath) to dst dir('/') again
            # for test can rename file if a file with the same name is dst dir
            data = {
                'dst_repo': repo.repo_id,
                'dst_dir': '/',
                'operation': 'copy',
            }
            u = urllib.parse.urlparse(furl)
            parsed_furl = urllib.parse.urlunparse((u.scheme, u.netloc, u.path, '', '', ''))
            res = self.post(parsed_furl+ '?p=' + quote(file_path), data=data)
            assert_in('tmp_file (2).txt', res.text)

    def test_download_file(self):
        with self.get_tmp_repo() as repo:
            fname, furl = self.create_file(repo)
            res = self.get(furl)
            self.assertRegex(res.text, '"http(.*)/%s"' % quote(fname))

    def test_download_file_without_reuse_token(self):
        with self.get_tmp_repo() as repo:
            fname, furl = self.create_file(repo)
            res = self.get(furl)
            self.assertRegex(res.text, '"http(.*)/%s"' % quote(fname))

            # download for the first time
            url = urllib.request.urlopen(res.text.strip('"'))
            code = url.getcode()
            self.assertEqual(code, 200)

            # download for the second time
            try:
                url = urllib.request.urlopen(res.text.strip('"'))
            except Exception as e:
                assert 'HTTP Error 400: Bad Request' in str(e)

            # url = urllib.request.urlopen(res.text.strip('"'))
            # code = url.getcode()
            # self.assertEqual(code, 400)


    def test_download_file_with_reuse_token(self):
        with self.get_tmp_repo() as repo:
            fname, furl = self.create_file(repo)
            res = self.get(furl + '&reuse=1')
            self.assertRegex(res.text, '"http(.*)/%s"' % quote(fname))

            # download for the first time
            url = urllib.request.urlopen(res.text.strip('"'))
            code = url.getcode()
            self.assertEqual(code, 200)

            # download for the second time
            url = urllib.request.urlopen(res.text.strip('"'))
            code = url.getcode()
            self.assertEqual(code, 200)

    def test_download_file_from_history(self):
        with self.get_tmp_repo() as repo:
            fname, _ = self.create_file(repo)
            file_history_url = urljoin(repo.repo_url, 'history/') + \
                               '?p=/%s' % quote(fname)
            res = self.get(file_history_url).json()
            commit_id = res['commits'][0]['id']
            self.assertEqual(len(commit_id), 40)
            data = {
                'p': fname,
                'commit_id': commit_id,
            }
            query = '?' + urlencode(data)
            res = self.get(repo.file_url + query)
            self.assertRegex(res.text, r'"http(.*)/%s"' % quote(fname))

    def test_get_file_detail(self):
        with self.get_tmp_repo() as repo:
            fname, _ = self.create_file(repo)
            fdurl = repo.file_url + 'detail/?p=/%s' % quote(fname)
            detail = self.get(fdurl).json()
            self.assertIsNotNone(detail)
            self.assertIsNotNone(detail['id'])
            self.assertIsNotNone(detail['mtime'])
            self.assertIsNotNone(detail['type'])
            self.assertIsNotNone(detail['name'])
            self.assertIsNotNone(detail['size'])
            self.assertIsNotNone(detail['starred'])
            self.assertIsNotNone(detail['last_modifier_email'])
            self.assertIsNotNone(detail['last_modifier_name'])
            self.assertIsNotNone(detail['last_modifier_contact_email'])

    def test_get_file_history(self):
        with self.get_tmp_repo() as repo:
            fname, _ = self.create_file(repo)
            fhurl = repo.file_url + 'history/?p=%s' % quote(fname)
            history = self.get(fhurl).json()
            for commit in history['commits']:
                self.assertIsNotNone(commit['rev_file_size'])
                #self.assertIsNotNone(commit['rev_file_id']) #allow null
                self.assertIsNotNone(commit['ctime'])
                self.assertIsNotNone(commit['creator_name'])
                self.assertIsNotNone(commit['creator'])
                self.assertIsNotNone(commit['root_id'])
                #self.assertIsNotNone(commit['rev_renamed_old_path']) #allow null
                #self.assertIsNotNone(commit['parent_id']) #allow null
                self.assertIsNotNone(commit['new_merge'])
                self.assertIsNotNone(commit['repo_id'])
                self.assertIsNotNone(commit['desc'])
                self.assertIsNotNone(commit['id'])
                self.assertIsNotNone(commit['conflict'])

                assert commit['user_info']['email'] == commit['creator_name']
                self.assertIsNotNone(commit['user_info']['name'])
                self.assertIsNotNone(commit['user_info']['contact_email'])
                #self.assertIsNotNone(commit['second_parent_id']) #allow null

    def test_get_upload_link(self):
        with self.get_tmp_repo() as repo:
            upload_url = urljoin(repo.repo_url, 'upload-link')
            res = self.get(upload_url)
            self.assertRegex(res.text, r'"http(.*)/upload-api/[^/]+"')

    def test_get_upload_link_with_invalid_repo_id(self):
        repo_url = urljoin(REPOS_URL, '12345678-1234-1234-1234-12345678901b')
        upload_url = urljoin(repo_url, 'upload-link')
        self.get(upload_url, expected=404)

    def test_get_update_link(self):
        with self.get_tmp_repo() as repo:
            update_url = urljoin(repo.repo_url, 'update-link')
            res = self.get(update_url)
            self.assertRegex(res.text, r'"http(.*)/update-api/[^/]+"')

    def test_get_update_link_with_invalid_repo_id(self):
        repo_url = urljoin(REPOS_URL, '12345678-1234-1234-1234-12345678901b')
        update_url = urljoin(repo_url, 'update-link')
        self.get(update_url, expected=404)

    # def test_upload_file(self):
    #     # XXX: requests has problems when post a file whose name contains
    #     # non-ascii data
    #     fname = 'file-upload-test %s.txt' % randstring()
    #     furl = self.test_file_url + '?p=/%s' % quote(fname)
    #     self.delete(furl)
    #     upload_url = self.test_repo_url + u'upload-link/'
    #     res = self.get(upload_url)
    #     upload_api_url = re.match(r'"(.*)"', res.text).group(1)
    #     files = {
    #         'file': (fname, 'Some lines in this file'),
    #         'parent_dir': '/',
    #     }
    #     res = self.post(upload_api_url, files=files)
    #     self.assertRegexpMatches(res.text, r'\w{40,40}')

    # def test_update_file(self):
    #     fname = 'file-update-test %s.txt' % randstring()
    #     _, furl = self.create_file(fname=fname)
    #     update_url = self.test_repo_url + u'update-link/'
    #     res = self.get(update_url)
    #     update_api_url = re.match(r'"(.*)"', res.text).group(1)
    #     files = {
    #         'file': ('filename', 'Updated content of this file'),
    #         'target_file': '/test_update.c'
    #     }
    #     res = self.post(update_api_url, files=files)
    #     self.assertRegexpMatches(res.text, r'\w{40,40}')

    def test_get_upload_blocks_link(self):
        with self.get_tmp_repo() as repo:
            upload_blks_url = urljoin(repo.repo_url, 'upload-blks-link')
            res = self.get(upload_blks_url)
            self.assertRegex(res.text, r'"http(.*)/upload-blks-api/[^/]+"')

    def test_get_upload_blocks_link_with_invalid_repo_id(self):
        repo_url = urljoin(REPOS_URL, '12345678-1234-1234-1234-12345678901b')
        upload_blks_url = urljoin(repo_url, 'upload-blks-link')
        self.get(upload_blks_url, expected=404)

    def test_get_update_blocks_link(self):
        with self.get_tmp_repo() as repo:
            update_blks_url = urljoin(repo.repo_url, 'update-blks-link')
            res = self.get(update_blks_url)
            self.assertRegex(res.text, r'"http(.*)/update-blks-api/[^/]+"')

    def test_get_update_blocks_link_with_invalid_repo_id(self):
        repo_url = urljoin(REPOS_URL, '12345678-1234-1234-1234-12345678901b')
        update_blks_url = urljoin(repo_url, 'update-blks-link')
        self.get(update_blks_url, expected=404)

    def test_only_list_dir(self):
        with self.get_tmp_repo() as repo:
            self.create_file(repo)
            self.create_dir(repo)
            dirents = self.get(repo.dir_url + '?t=d').json()
            self.assertHasLen(dirents, 1)
            for dirent in dirents:
                self.assertIsNotNone(dirent['id'])
                self.assertIsNotNone(dirent['name'])
                self.assertEqual(dirent['type'], 'dir')

    def test_only_list_file(self):
        with self.get_tmp_repo() as repo:
            self.create_file(repo)
            self.create_dir(repo)
            dirents = self.get(repo.dir_url + '?t=f').json()
            self.assertHasLen(dirents, 1)
            for dirent in dirents:
                self.assertIsNotNone(dirent['id'])
                self.assertIsNotNone(dirent['name'])
                self.assertIsNotNone(dirent['size'])
                self.assertEqual(dirent['type'], 'file')

    def test_list_dir_and_file(self):
        with self.get_tmp_repo() as repo:
            self.create_file(repo)
            self.create_dir(repo)
            dirents = self.get(repo.dir_url).json()
            self.assertHasLen(dirents, 2)
            for dirent in dirents:
                self.assertIsNotNone(dirent['id'])
                self.assertIsNotNone(dirent['name'])
                self.assertIn(dirent['type'], ('file', 'dir'))
                if dirent['type'] == 'file':
                    self.assertIsNotNone(dirent['size'])

    def test_list_recursive_dir(self):
        with self.get_tmp_repo() as repo:
            # create test dir
            data = {'operation': 'mkdir'}
            dir_list = ['/1/', '/1/2/', '/1/2/3/', '/4/', '/4/5/', '/6/']
            for dpath in dir_list:
                durl = repo.get_dirpath_url(dpath)
                self.post(durl, data=data, expected=201)

            # get recursive dir
            dirents = self.get(repo.dir_url + '?t=d&recursive=1').json()
            self.assertHasLen(dirents, len(dir_list))
            for dirent in dirents:
                self.assertIsNotNone(dirent['id'])
                self.assertEqual(dirent['type'], 'dir')
                full_path = posixpath.join(dirent['parent_dir'], dirent['name']) + '/'
                self.assertIn(full_path, dir_list)

            # get recursive dir with files info
            # create test file
            tmp_file_name = '%s.txt' % randstring()
            self.create_file(repo, fname=tmp_file_name)

            dirents = self.get(repo.dir_url + '?recursive=1').json()
            self.assertHasLen(dirents, len(dir_list) + 1)
            for dirent in dirents:
                self.assertIsNotNone(dirent['id'])
                if dirent['type'] == 'dir':
                    full_dir_path = posixpath.join(dirent['parent_dir'], dirent['name']) + '/'
                    self.assertIn(full_dir_path, dir_list)
                else:
                    full_file_path = posixpath.join(dirent['parent_dir'], dirent['name'])
                    self.assertEqual('/' + tmp_file_name, full_file_path)

    def test_remove_dir(self):
        with self.get_tmp_repo() as repo:
            _, durl = self.create_dir(repo)
            res = self.delete(durl)
            self.assertEqual(res.text, '"success"')
            self.get(durl, expected=404)

    @pytest.mark.xfail
    def test_create_dir_with_parents(self):
        with self.get_tmp_repo() as repo:
            path = '/level1/level 2/level_3/目录4'
            self.create_dir_with_parents(repo, path)

    def create_dir_with_parents(self, repo, path):
        data = {'operation': 'mkdir', 'create_parents': 'true'}
        durl = repo.get_dirpath_url(path.encode('utf-8'))
        self.post(durl, data=data, expected=201)
        curpath = ''
        # check the parents are created along the way
        parts = path.split('/')
        for i, name in enumerate(parts):
            curpath += '/' + name
            url = repo.get_dirpath_url(curpath.encode('utf-8'))
            if i < len(parts) - 1:
                assert self.get(url).json()[0]['name'] == parts[i+1]
            else:
                assert self.get(url).json() == []
