Как‐то понадобился мне просмотр списка файлов в удалённом репозитории. Клонировать его при этом как‐то не очень хотелось. Поиск в интернете ожидаемо дал множество ответов вида «это невозможно, делайте клон». А мне‐то надо всего‐навсего убедиться, что по некоторой ссылке находится репозиторий, соответствующий некоторому архиву с исходными кодами. Так как «некоторая ссылка» находится на странице с описанием содержимого этого архива (точнее, дополнения в этом архиве), то мне показалось достаточным сравнить только список файлов. Как быть?
Конечно, Mercurial не предоставляет практически никаких возможностей работы с удалённым репозиторием. Точнее, можно сделать push и pull (ну и clone как частный случай последнего). Но можно ли сделать pull, не затрагивая при этом файловую систему? Ответ: можно, здесь нам поможет hg incoming
. Собственно, алгоритм работы такой:
- Создать где‐то новый пустой репозиторий. В пустой репозиторий можно делать
pull
из любого репозитория. - Используя
hg incoming
получить список изменений. Так какhg incoming
использует те же функции, что иhg log
, то мы не ограничены в возможностях изменения его вывода. В частности, можно получить список всех файлов, изменённых в каждой из ревизий, или даже сами изменения в форматеunified diff
(с расширениямиgit
для бинарных файлов). Diff нам не нужен, а вот список всех изменённых файлов — пригодится. - Так как мы получаем все ревизии, то по ходу дела можно в дополнение к списку родилей к каждому изменению присоединить и список детей. Отсутствие детей, которые не являются предками ревизии, список файлов в которой нас интересует, нас не волнует.
- У mercurial есть одна ревизия, которая обязательно присутствует в любом репозитории и при том является единственной, реально не имеющей ни одного родителя:
-1:0000000000000000000000000000000000000000
. Это хорошая начальная точка.
Начиная с данной ревизии найдём список файлов во всех остальных ревизиях (список файлов в начальной ревизии известен: он пуст). Для этого- Для каждой ревизии, кроме начальной, возьмём список файлов из первого родителя. Ревизии обходятся от родителей к детям.
- Добавим в этот список список добавленных файлов (его вы получите, если используете
hg incoming --format xml --verbose
: в тёгеpaths
). - Удалим из этого списка список удалённых файлов (получается там же).
- Теперь найдём ревизию, не имеющую ни одного потомка. Это и будет ревизия, запрошенная с помощью
hg incoming --rev revspec
. Найдя эту ревизию, выведем список файлов в ней.
Замечу, что вывод hg incoming
с форматом по‐умолчанию невозможно использовать для такой операции. Надо либо писать свой шаблон с {file_adds}
, {file_mods}
и {file_dels}
, либо взять готовый: --format xml
. Ключ --template
вам здесь не поможет. Написание своего формата сильно сократит код по сравнению с использованием sax парсера для XML, но я предпочёл взять --format xml
.
#!/usr/bin/env python
# vim: fileencoding=utf-8
from __future__ import unicode_literals, division
from xml import sax
from subprocess import check_call, Popen, PIPE
from shutil import rmtree
from tempfile import mkdtemp
class MercurialRevision(object):
__slots__ = ('rev', 'hex',
'tags', 'bookmarks', 'branch',
'parents', 'children',
'added', 'removed', 'modified',
'copies',
'files',)
def __init__(self, rev, hex):
self.rev = rev
self.hex = hex
self.parents = []
self.children = []
self.added = set()
self.removed = set()
self.modified = set()
self.copies = {}
self.tags = set()
self.bookmarks = set()
self.branch = None
self.files = set()
def __str__(self):
return '<revision>'.format(hex=self.hex, rev=self.rev)
def __repr__(self):
return '{0}({rev!r}, {hex!r})'.format(self.__class__.__name__, hex=self.hex, rev=self.rev)
def __hash__(self):
return int(self.hex, 16)
class MercurialHandler(sax.handler.ContentHandler):
def startDocument(self):
self.curpath = []
self.currev = None
nullrev = MercurialRevision(-1, '0' * 40)
self.revisions_rev = {nullrev.rev : nullrev}
self.revisions_hex = {nullrev.hex : nullrev}
self.tags = {}
self.bookmarks = {}
self.characters_fun = None
self.last_data = None
def add_tag(self, tag):
self.currev.tags.add(tag)
self.tags[tag] = self.currev
def add_bookmark(self, bookmark):
self.currev.bookmarks.add(bookmark)
self.bookmarks[bookmark] = self.currev
def characters(self, data):
if self.characters_fun:
if not self.last_data:
self.last_data = data
else:
self.last_data += data
def startElement(self, name, attributes):
if name == 'log':
assert not self.curpath
assert not self.currev
elif name == 'logentry':
assert self.curpath == ['log']
assert not self.currev
self.currev = MercurialRevision(int(attributes['revision']), attributes['node'])
else:
assert self.currev
if name == 'tag':
assert self.curpath[-1] == 'logentry'
self.characters_fun = self.add_tag
elif name == 'bookmark':
assert self.curpath[-1] == 'logentry'
self.characters_fun = self.add_bookmark
elif name == 'parent':
assert self.curpath[-1] == 'logentry'
self.currev.parents.append(self.revisions_hex[attributes['node']])
elif name == 'branch':
assert self.curpath[-1] == 'logentry'
self.characters_fun = lambda branch: self.currev.__setattr__('branch', branch)
elif name == 'path':
assert self.curpath[-1] == 'paths'
if attributes['action'] == 'M':
self.characters_fun = self.currev.modified.add
elif attributes['action'] == 'A':
self.characters_fun = self.currev.added.add
elif attributes['action'] == 'R':
self.characters_fun = self.currev.removed.add
elif name == 'copy':
assert self.curpath[-1] == 'copies'
self.characters_fun = (lambda destination, source=attributes['source']:
self.currev.copies.__setitem__(source, destination))
self.curpath.append(name)
def endElement(self, name):
assert self.curpath or self.curpath[-1] == ['log']
assert self.curpath[-1] == name
if name == 'logentry':
if not self.currev.parents:
self.currev.parents.append(self.revisions_rev[self.currev.rev - 1])
for parent in self.currev.parents:
parent.children.append(self.currev)
self.revisions_hex[self.currev.hex] = self.currev
self.revisions_rev[self.currev.rev] = self.currev
self.currev = None
if self.last_data is None:
if self.characters_fun:
self.characters_fun('')
else:
assert self.characters_fun
self.characters_fun(self.last_data)
self.characters_fun = None
self.last_data = None
self.curpath.pop()
def export_result(self):
heads = {revision for revision in self.revisions_hex.values()
if not revision.children
or all(child.branch != revision.branch for child in revision.children)}
# heads contains the same revisions as `hg heads --closed`
tips = {head for head in heads if not head.children}
return {
'heads': heads,
'tips': tips,
'tags': self.tags,
'bookmarks': self.bookmarks,
'revisions_hex': self.revisions_hex,
'revisions_rev': self.revisions_rev,
'root': self.revisions_rev[-1],
}
class MercurialRemoteParser(object):
__slots__ = ('parser', 'handler', 'tmpdir')
def __init__(self, tmpdir=None):
self.parser = sax.make_parser()
self.handler = MercurialHandler()
self.parser.setContentHandler(self.handler)
self.tmpdir = tmpdir or mkdtemp(suffix='.hg')
self.init_tmpdir()
def init_tmpdir(self):
check_call(['hg', 'init', self.tmpdir])
def delete_tmpdir(self):
if self.tmpdir and rmtree:
rmtree(self.tmpdir)
__del__ = delete_tmpdir
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.delete_tmpdir()
@staticmethod
def generate_files(parsing_result):
toprocess = [parsing_result['root']]
processed = set()
while toprocess:
revision = toprocess.pop(0)
if revision.parents:
# Inherit files from the first parent
assert not revision.files
if revision.parents[0] not in processed:
assert toprocess
toprocess.append(revision)
continue
revision.files.update(revision.parents[0].files)
# Then apply delta found in log
assert not (revision.files & revision.added)
revision.files.update(revision.added)
assert revision.files > revision.removed
revision.files -= revision.removed
assert revision.files > revision.modified, (
'Expected to find the following files: ' + ','.join(
file for file in revision.modified if not file in revision.files))
processed.add(revision)
toprocess.extend(child for child in revision.children
if not child in processed and not child in toprocess)
assert set(parsing_result['revisions_rev'].values()) == processed
return parsing_result
def parse_url(self, url, rev_name=None):
p = Popen(['hg', '--repository', self.tmpdir,
'incoming', '--style', 'xml', '--verbose', url,
] + (['--rev', rev_name] if rev_name else []),
stdout=PIPE)
p.stdout.readline() # Skip “comparing with {url}” header
self.parser.parse(p.stdout)
parsing_result = self.handler.export_result()
self.generate_files(parsing_result)
return parsing_result
if __name__ == '__main__':
import sys
def print_files(revision):
for file in revision.files:
print file
remote_url = sys.argv[1]
rev_name = sys.argv[2]
with MercurialRemoteParser() as remote_parser:
parsing_result = remote_parser.parse_url(remote_url, rev_name=rev_name)
assert len(parsing_result['tips']) == 1, 'Found more then one head'
print_files(next(iter(parsing_result['tips'])))
# vim: tw=100 ft=python ts=4 sts=4 sw=4
</revision>
Использование: python -O list_hg_files.py https://bitbucket.org/ZyX_I/aurum tip
. Оба аргумента (URL удалённого репозитория и обозначение ревизии) обязательны.
Автор: ZyXI