File: //lib/python3/dist-packages/breezy/transport/gio_transport.py
# Copyright (C) 2010 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#
# Author: Mattias Eriksson
"""Implementation of Transport over gio.
Written by Mattias Eriksson <snaggen@acc.umu.se> based on the ftp transport.
It provides the gio+XXX:// protocols where XXX is any of the protocols
supported by gio.
"""
from __future__ import absolute_import
from io import BytesIO
import os
import random
import stat
import time
try:
from urllib.parse import (
urlparse,
urlunparse,
)
except ImportError:
from urlparse import (
urlparse,
urlunparse,
)
from .. import (
config,
errors,
osutils,
urlutils,
debug,
ui,
)
from ..sixish import (
text_type,
)
from ..trace import mutter
from . import (
FileStream,
ConnectedTransport,
_file_streams,
)
from ..tests.test_server import TestServer
try:
import glib
except ImportError as e:
raise errors.DependencyNotPresent('glib', e)
try:
import gio
except ImportError as e:
raise errors.DependencyNotPresent('gio', e)
class GioLocalURLServer(TestServer):
"""A pretend server for local transports, using file:// urls.
Of course no actual server is required to access the local filesystem, so
this just exists to tell the test code how to get to it.
"""
def start_server(self):
pass
def get_url(self):
"""See Transport.Server.get_url."""
return "gio+" + urlutils.local_path_to_url('')
class GioFileStream(FileStream):
"""A file stream object returned by open_write_stream.
This version uses GIO to perform writes.
"""
def __init__(self, transport, relpath):
FileStream.__init__(self, transport, relpath)
self.gio_file = transport._get_GIO(relpath)
self.stream = self.gio_file.create()
def _close(self):
self.stream.close()
def write(self, bytes):
try:
# Using pump_string_file seems to make things crash
osutils.pumpfile(BytesIO(bytes), self.stream)
except gio.Error as e:
# self.transport._translate_gio_error(e,self.relpath)
raise errors.BzrError(str(e))
class GioStatResult(object):
def __init__(self, f):
info = f.query_info('standard::size,standard::type')
self.st_size = info.get_size()
type = info.get_file_type()
if (type == gio.FILE_TYPE_REGULAR):
self.st_mode = stat.S_IFREG
elif type == gio.FILE_TYPE_DIRECTORY:
self.st_mode = stat.S_IFDIR
class GioTransport(ConnectedTransport):
"""This is the transport agent for gio+XXX:// access."""
def __init__(self, base, _from_transport=None):
"""Initialize the GIO transport and make sure the url is correct."""
if not base.startswith('gio+'):
raise ValueError(base)
(scheme, netloc, path, params, query, fragment) = \
urlparse(base[len('gio+'):], allow_fragments=False)
if '@' in netloc:
user, netloc = netloc.rsplit('@', 1)
# Seems it is not possible to list supported backends for GIO
# so a hardcoded list it is then.
gio_backends = ['dav', 'file', 'ftp', 'obex', 'sftp', 'ssh', 'smb']
if scheme not in gio_backends:
raise urlutils.InvalidURL(base,
extra="GIO support is only available for " +
', '.join(gio_backends))
# Remove the username and password from the url we send to GIO
# by rebuilding the url again.
u = (scheme, netloc, path, '', '', '')
self.url = urlunparse(u)
# And finally initialize super
super(GioTransport, self).__init__(base,
_from_transport=_from_transport)
def _relpath_to_url(self, relpath):
full_url = urlutils.join(self.url, relpath)
if isinstance(full_url, text_type):
raise urlutils.InvalidURL(full_url)
return full_url
def _get_GIO(self, relpath):
"""Return the ftplib.GIO instance for this object."""
# Ensures that a connection is established
connection = self._get_connection()
if connection is None:
# First connection ever
connection, credentials = self._create_connection()
self._set_connection(connection, credentials)
fileurl = self._relpath_to_url(relpath)
file = gio.File(fileurl)
return file
def _auth_cb(self, op, message, default_user, default_domain, flags):
# really use breezy.auth get_password for this
# or possibly better gnome-keyring?
auth = config.AuthenticationConfig()
parsed_url = urlutils.URL.from_string(self.url)
user = None
if (flags & gio.ASK_PASSWORD_NEED_USERNAME and
flags & gio.ASK_PASSWORD_NEED_DOMAIN):
prompt = (u'%s' % (parsed_url.scheme.upper(),) +
u' %(host)s DOMAIN\\username')
user_and_domain = auth.get_user(parsed_url.scheme,
parsed_url.host, port=parsed_url.port, ask=True,
prompt=prompt)
(domain, user) = user_and_domain.split('\\', 1)
op.set_username(user)
op.set_domain(domain)
elif flags & gio.ASK_PASSWORD_NEED_USERNAME:
user = auth.get_user(parsed_url.scheme, parsed_url.host,
port=parsed_url.port, ask=True)
op.set_username(user)
elif flags & gio.ASK_PASSWORD_NEED_DOMAIN:
# Don't know how common this case is, but anyway
# a DOMAIN and a username prompt should be the
# same so I will missuse the ui_factory get_username
# a little bit here.
prompt = (u'%s' % (parsed_url.scheme.upper(),) +
u' %(host)s DOMAIN')
domain = ui.ui_factory.get_username(prompt=prompt)
op.set_domain(domain)
if flags & gio.ASK_PASSWORD_NEED_PASSWORD:
if user is None:
user = op.get_username()
password = auth.get_password(parsed_url.scheme, parsed_url.host,
user, port=parsed_url.port)
op.set_password(password)
op.reply(gio.MOUNT_OPERATION_HANDLED)
def _mount_done_cb(self, obj, res):
try:
obj.mount_enclosing_volume_finish(res)
self.loop.quit()
except gio.Error as e:
self.loop.quit()
raise errors.BzrError(
"Failed to mount the given location: " + str(e))
def _create_connection(self, credentials=None):
if credentials is None:
user, password = self._parsed_url.user, self._parsed_url.password
else:
user, password = credentials
try:
connection = gio.File(self.url)
mount = None
try:
mount = connection.find_enclosing_mount()
except gio.Error as e:
if (e.code == gio.ERROR_NOT_MOUNTED):
self.loop = glib.MainLoop()
ui.ui_factory.show_message('Mounting %s using GIO' %
self.url)
op = gio.MountOperation()
if user:
op.set_username(user)
if password:
op.set_password(password)
op.connect('ask-password', self._auth_cb)
m = connection.mount_enclosing_volume(op,
self._mount_done_cb)
self.loop.run()
except gio.Error as e:
raise errors.TransportError(msg="Error setting up connection:"
" %s" % str(e), orig_error=e)
return connection, (user, password)
def disconnect(self):
# FIXME: Nothing seems to be necessary here, which sounds a bit strange
# -- vila 20100601
pass
def _reconnect(self):
# FIXME: This doesn't seem to be used -- vila 20100601
"""Create a new connection with the previously used credentials"""
credentials = self._get_credentials()
connection, credentials = self._create_connection(credentials)
self._set_connection(connection, credentials)
def _remote_path(self, relpath):
return self._parsed_url.clone(relpath).path
def has(self, relpath):
"""Does the target location exist?"""
try:
if 'gio' in debug.debug_flags:
mutter('GIO has check: %s' % relpath)
f = self._get_GIO(relpath)
st = GioStatResult(f)
if stat.S_ISREG(st.st_mode) or stat.S_ISDIR(st.st_mode):
return True
return False
except gio.Error as e:
if e.code == gio.ERROR_NOT_FOUND:
return False
else:
self._translate_gio_error(e, relpath)
def get(self, relpath, retries=0):
"""Get the file at the given relative path.
:param relpath: The relative path to the file
:param retries: Number of retries after temporary failures so far
for this operation.
We're meant to return a file-like object which bzr will
then read from. For now we do this via the magic of BytesIO
"""
try:
if 'gio' in debug.debug_flags:
mutter("GIO get: %s" % relpath)
f = self._get_GIO(relpath)
fin = f.read()
buf = fin.read()
fin.close()
return BytesIO(buf)
except gio.Error as e:
# If we get a not mounted here it might mean
# that a bad path has been entered (or that mount failed)
if (e.code == gio.ERROR_NOT_MOUNTED):
raise errors.PathError(relpath,
extra='Failed to get file, make sure the path is correct. '
+ str(e))
else:
self._translate_gio_error(e, relpath)
def put_file(self, relpath, fp, mode=None):
"""Copy the file-like object into the location.
:param relpath: Location to put the contents, relative to base.
:param fp: File-like or string object.
"""
if 'gio' in debug.debug_flags:
mutter("GIO put_file %s" % relpath)
tmppath = '%s.tmp.%.9f.%d.%d' % (relpath, time.time(),
os.getpid(), random.randint(0, 0x7FFFFFFF))
f = None
fout = None
try:
closed = True
try:
f = self._get_GIO(tmppath)
fout = f.create()
closed = False
length = self._pump(fp, fout)
fout.close()
closed = True
self.stat(tmppath)
dest = self._get_GIO(relpath)
f.move(dest, flags=gio.FILE_COPY_OVERWRITE)
f = None
if mode is not None:
self._setmode(relpath, mode)
return length
except gio.Error as e:
self._translate_gio_error(e, relpath)
finally:
if not closed and fout is not None:
fout.close()
if f is not None and f.query_exists():
f.delete()
def mkdir(self, relpath, mode=None):
"""Create a directory at the given path."""
try:
if 'gio' in debug.debug_flags:
mutter("GIO mkdir: %s" % relpath)
f = self._get_GIO(relpath)
f.make_directory()
self._setmode(relpath, mode)
except gio.Error as e:
self._translate_gio_error(e, relpath)
def open_write_stream(self, relpath, mode=None):
"""See Transport.open_write_stream."""
if 'gio' in debug.debug_flags:
mutter("GIO open_write_stream %s" % relpath)
if mode is not None:
self._setmode(relpath, mode)
result = GioFileStream(self, relpath)
_file_streams[self.abspath(relpath)] = result
return result
def recommended_page_size(self):
"""See Transport.recommended_page_size().
For FTP we suggest a large page size to reduce the overhead
introduced by latency.
"""
if 'gio' in debug.debug_flags:
mutter("GIO recommended_page")
return 64 * 1024
def rmdir(self, relpath):
"""Delete the directory at rel_path"""
try:
if 'gio' in debug.debug_flags:
mutter("GIO rmdir %s" % relpath)
st = self.stat(relpath)
if stat.S_ISDIR(st.st_mode):
f = self._get_GIO(relpath)
f.delete()
else:
raise errors.NotADirectory(relpath)
except gio.Error as e:
self._translate_gio_error(e, relpath)
except errors.NotADirectory as e:
# just pass it forward
raise e
except Exception as e:
mutter('failed to rmdir %s: %s' % (relpath, e))
raise errors.PathError(relpath)
def append_file(self, relpath, file, mode=None):
"""Append the text in the file-like object into the final
location.
"""
# GIO append_to seems not to append but to truncate
# Work around this.
if 'gio' in debug.debug_flags:
mutter("GIO append_file: %s" % relpath)
tmppath = '%s.tmp.%.9f.%d.%d' % (relpath, time.time(),
os.getpid(), random.randint(0, 0x7FFFFFFF))
try:
result = 0
fo = self._get_GIO(tmppath)
fi = self._get_GIO(relpath)
fout = fo.create()
try:
info = GioStatResult(fi)
result = info.st_size
fin = fi.read()
self._pump(fin, fout)
fin.close()
# This separate except is to catch and ignore the
# gio.ERROR_NOT_FOUND for the already existing file.
# It is valid to open a non-existing file for append.
# This is caused by the broken gio append_to...
except gio.Error as e:
if e.code != gio.ERROR_NOT_FOUND:
self._translate_gio_error(e, relpath)
length = self._pump(file, fout)
fout.close()
info = GioStatResult(fo)
if info.st_size != result + length:
raise errors.BzrError("Failed to append size after "
"(%d) is not original (%d) + written (%d) total (%d)" %
(info.st_size, result, length, result + length))
fo.move(fi, flags=gio.FILE_COPY_OVERWRITE)
return result
except gio.Error as e:
self._translate_gio_error(e, relpath)
def _setmode(self, relpath, mode):
"""Set permissions on a path.
Only set permissions on Unix systems
"""
if 'gio' in debug.debug_flags:
mutter("GIO _setmode %s" % relpath)
if mode:
try:
f = self._get_GIO(relpath)
f.set_attribute_uint32(gio.FILE_ATTRIBUTE_UNIX_MODE, mode)
except gio.Error as e:
if e.code == gio.ERROR_NOT_SUPPORTED:
# Command probably not available on this server
mutter("GIO Could not set permissions to %s on %s. %s",
oct(mode), self._remote_path(relpath), str(e))
else:
self._translate_gio_error(e, relpath)
def rename(self, rel_from, rel_to):
"""Rename without special overwriting"""
try:
if 'gio' in debug.debug_flags:
mutter("GIO move (rename): %s => %s", rel_from, rel_to)
f = self._get_GIO(rel_from)
t = self._get_GIO(rel_to)
f.move(t)
except gio.Error as e:
self._translate_gio_error(e, rel_from)
def move(self, rel_from, rel_to):
"""Move the item at rel_from to the location at rel_to"""
try:
if 'gio' in debug.debug_flags:
mutter("GIO move: %s => %s", rel_from, rel_to)
f = self._get_GIO(rel_from)
t = self._get_GIO(rel_to)
f.move(t, flags=gio.FILE_COPY_OVERWRITE)
except gio.Error as e:
self._translate_gio_error(e, relfrom)
def delete(self, relpath):
"""Delete the item at relpath"""
try:
if 'gio' in debug.debug_flags:
mutter("GIO delete: %s", relpath)
f = self._get_GIO(relpath)
f.delete()
except gio.Error as e:
self._translate_gio_error(e, relpath)
def external_url(self):
"""See breezy.transport.Transport.external_url."""
if 'gio' in debug.debug_flags:
mutter("GIO external_url", self.base)
# GIO external url
return self.base
def listable(self):
"""See Transport.listable."""
if 'gio' in debug.debug_flags:
mutter("GIO listable")
return True
def list_dir(self, relpath):
"""See Transport.list_dir."""
if 'gio' in debug.debug_flags:
mutter("GIO list_dir")
try:
entries = []
f = self._get_GIO(relpath)
children = f.enumerate_children(gio.FILE_ATTRIBUTE_STANDARD_NAME)
for child in children:
entries.append(urlutils.escape(child.get_name()))
return entries
except gio.Error as e:
self._translate_gio_error(e, relpath)
def iter_files_recursive(self):
"""See Transport.iter_files_recursive.
This is cargo-culted from the SFTP transport"""
if 'gio' in debug.debug_flags:
mutter("GIO iter_files_recursive")
queue = list(self.list_dir("."))
while queue:
relpath = queue.pop(0)
st = self.stat(relpath)
if stat.S_ISDIR(st.st_mode):
for i, basename in enumerate(self.list_dir(relpath)):
queue.insert(i, relpath + "/" + basename)
else:
yield relpath
def stat(self, relpath):
"""Return the stat information for a file."""
try:
if 'gio' in debug.debug_flags:
mutter("GIO stat: %s", relpath)
f = self._get_GIO(relpath)
return GioStatResult(f)
except gio.Error as e:
self._translate_gio_error(e, relpath, extra='error w/ stat')
def lock_read(self, relpath):
"""Lock the given file for shared (read) access.
:return: A lock object, which should be passed to Transport.unlock()
"""
if 'gio' in debug.debug_flags:
mutter("GIO lock_read", relpath)
class BogusLock(object):
# The old RemoteBranch ignore lock for reading, so we will
# continue that tradition and return a bogus lock object.
def __init__(self, path):
self.path = path
def unlock(self):
pass
return BogusLock(relpath)
def lock_write(self, relpath):
"""Lock the given file for exclusive (write) access.
WARNING: many transports do not support this, so trying avoid using it
:return: A lock object, whichshould be passed to Transport.unlock()
"""
if 'gio' in debug.debug_flags:
mutter("GIO lock_write", relpath)
return self.lock_read(relpath)
def _translate_gio_error(self, err, path, extra=None):
if 'gio' in debug.debug_flags:
mutter("GIO Error: %s %s" % (str(err), path))
if extra is None:
extra = str(err)
if err.code == gio.ERROR_NOT_FOUND:
raise errors.NoSuchFile(path, extra=extra)
elif err.code == gio.ERROR_EXISTS:
raise errors.FileExists(path, extra=extra)
elif err.code == gio.ERROR_NOT_DIRECTORY:
raise errors.NotADirectory(path, extra=extra)
elif err.code == gio.ERROR_NOT_EMPTY:
raise errors.DirectoryNotEmpty(path, extra=extra)
elif err.code == gio.ERROR_BUSY:
raise errors.ResourceBusy(path, extra=extra)
elif err.code == gio.ERROR_PERMISSION_DENIED:
raise errors.PermissionDenied(path, extra=extra)
elif err.code == gio.ERROR_HOST_NOT_FOUND:
raise errors.PathError(path, extra=extra)
elif err.code == gio.ERROR_IS_DIRECTORY:
raise errors.PathError(path, extra=extra)
else:
mutter('unable to understand error for path: %s: %s', path, err)
raise errors.PathError(path,
extra="Unhandled gio error: " + str(err))
def get_test_permutations():
"""Return the permutations to be used in testing."""
from breezy.tests import test_server
return [(GioTransport, GioLocalURLServer)]