"""
.. module:: repository
:platform: Unix, Windows
:synopsis: A module for examining a single git repository
.. moduleauthor:: Will McGinnis <will@pedalwrencher.com>
"""
import os
import sys
import datetime
import time
import numpy as np
import json
import logging
import tempfile
import shutil
from git import Repo, GitCommandError
from pandas import DataFrame, to_datetime
__author__ = 'willmcginnis'
[docs]class Repository(object):
"""
The base class for a generic git repository, from which to gather statistics. The object encapulates a single
gitpython Repo instance.
:param working_dir: the directory of the git repository, meaning a .git directory is in it (default None=cwd)
:return:
"""
def __init__(self, working_dir=None, verbose=False):
self.verbose = verbose
self.log = logging.getLogger('gitpandas')
self.__delete_hook = False
self._git_repo_name = None
if working_dir is not None:
if working_dir[:3] == 'git':
if self.verbose:
print('cloning repository: %s into a temporary location' % (working_dir, ))
dir_path = tempfile.mkdtemp()
self.repo = Repo.clone_from(working_dir, dir_path)
self._git_repo_name = working_dir.split(os.sep)[-1].split('.')[0]
self.git_dir = dir_path
self.__delete_hook = True
else:
self.git_dir = working_dir
self.repo = Repo(self.git_dir)
else:
self.git_dir = os.getcwd()
self.repo = Repo(self.git_dir)
if self.verbose:
print('Repository [%s] instantiated at directory: %s' % (self._repo_name(), self.git_dir))
def __del__(self):
"""
On delete, clean up any temporary repositories still hanging around
:return:
"""
if self.__delete_hook:
if os.path.exists(self.git_dir):
shutil.rmtree(self.git_dir)
[docs] def is_bare(self):
"""
Returns a boolean for if the repo is bare or not
:return: bool
"""
return self.repo.bare
[docs] def has_coverage(self):
"""
Returns a boolean for is a parseable .coverage file can be found in the repository
:return: bool
"""
if os.path.exists(self.git_dir + os.sep + '.coverage'):
try:
with open(self.git_dir + os.sep + '.coverage', 'r') as f:
blob = f.read()
blob = blob.split('!')[2]
_ = json.loads(blob)
return True
except:
return False
else:
return False
[docs] def coverage(self):
"""
If there is a .coverage file available, this will attempt to form a DataFrame with that information in it, which
will contain the columns:
* filename
* lines_covered
* total_lines
* coverage
If it can't be found or parsed, an empty DataFrame of that form will be returned.
:return: DataFrame
"""
if not self.has_coverage():
return DataFrame(columns=['filename', 'lines_covered', 'total_lines', 'coverage'])
with open(self.git_dir + os.sep + '.coverage', 'r') as f:
blob = f.read()
blob = blob.split('!')[2]
cov = json.loads(blob)
ds = []
for filename in cov['lines'].keys():
idx = 0
with open(filename, 'r') as f:
for idx, l in enumerate(f):
pass
num_lines = idx + 1
short_filename = filename.split(self.git_dir + os.sep)[1]
ds.append([short_filename, len(cov['lines'][filename]), num_lines])
df = DataFrame(ds, columns=['filename', 'lines_covered', 'total_lines'])
df['coverage'] = df['lines_covered'] / df['total_lines']
return df
[docs] def commit_history(self, branch='master', limit=None, extensions=None, ignore_dir=None, days=None):
"""
Returns a pandas DataFrame containing all of the commits for a given branch. Included in that DataFrame will be
the columns:
* date (index)
* author
* committer
* message
* lines
* insertions
* deletions
* net
:param branch: the branch to return commits for
:param limit: (optional, default=None) a maximum number of commits to return, None for no limit
:param extensions: (optional, default=None) a list of file extensions to return commits for
:param ignore_dir: (optional, default=None) a list of directory names to ignore
:param days: (optional, default=None) number of days to return, if limit is None
:return: DataFrame
"""
# setup the data-set of commits
if limit is None:
if days is None:
ds = [[
x.author.name,
x.committer.name,
x.committed_date,
x.message,
self.__check_extension(x.stats.files, extensions, ignore_dir)
] for x in self.repo.iter_commits(branch, max_count=sys.maxsize)]
else:
ds = []
c_date = time.time()
commits = self.repo.iter_commits(branch, max_count=sys.maxsize)
dlim = time.time() - days * 24 * 3600
while c_date > dlim:
try:
if sys.version_info.major == 2:
x = commits.next()
else:
x = commits.__next__()
except StopIteration as e:
break
ds.append([
x.author.name,
x.committer.name,
x.committed_date,
x.message,
self.__check_extension(x.stats.files, extensions, ignore_dir)
])
c_date = x.committed_date
else:
ds = [[
x.author.name,
x.committer.name,
x.committed_date,
x.message,
self.__check_extension(x.stats.files, extensions, ignore_dir)
] for x in self.repo.iter_commits(branch, max_count=limit)]
# aggregate stats
ds = [x[:-1] + [sum([x[-1][key]['lines'] for key in x[-1].keys()]),
sum([x[-1][key]['insertions'] for key in x[-1].keys()]),
sum([x[-1][key]['deletions'] for key in x[-1].keys()]),
sum([x[-1][key]['insertions'] for key in x[-1].keys()]) - sum([x[-1][key]['deletions'] for key in x[-1].keys()])
] for x in ds if len(x[-1].keys()) > 0]
# make it a pandas dataframe
df = DataFrame(ds, columns=['author', 'committer', 'date', 'message', 'lines', 'insertions', 'deletions', 'net'])
# format the date col and make it the index
df['date'] = to_datetime(df['date'].map(lambda x: datetime.datetime.fromtimestamp(x)))
df.set_index(keys=['date'], drop=True, inplace=True)
return df
[docs] def file_change_history(self, branch='master', limit=None, extensions=None, ignore_dir=None):
"""
Returns a DataFrame of all file changes (via the commit history) for the specified branch. This is similar to
the commit history DataFrame, but is one row per file edit rather than one row per commit (which may encapsulate
many file changes). Included in the DataFrame will be the columns:
* date (index)
* author
* committer
* message
* filename
* insertions
* deletions
:param branch: the branch to return commits for
:param limit: (optional, default=None) a maximum number of commits to return, None for no limit
:param extensions: (optional, default=None) a list of file extensions to return commits for
:param ignore_dir: (optional, default=None) a list of directory names to ignore
:return: DataFrame
"""
# setup the dataset of commits
if limit is None:
ds = [[
x.author.name,
x.committer.name,
x.committed_date,
x.message,
x.name_rev.split()[0],
self.__check_extension(x.stats.files, extensions, ignore_dir)
] for x in self.repo.iter_commits(branch, max_count=sys.maxsize)]
else:
ds = [[
x.author.name,
x.committer.name,
x.committed_date,
x.message,
x.name_rev.split()[0],
self.__check_extension(x.stats.files, extensions, ignore_dir)
] for x in self.repo.iter_commits(branch, max_count=limit)]
ds = [x[:-1] + [fn, x[-1][fn]['insertions'], x[-1][fn]['deletions']] for x in ds for fn in x[-1].keys() if len(x[-1].keys()) > 0]
# make it a pandas dataframe
df = DataFrame(ds, columns=['author', 'committer', 'date', 'message', 'rev', 'filename', 'insertions', 'deletions'])
# format the date col and make it the index
df['date'] = to_datetime(df['date'].map(lambda x: datetime.datetime.fromtimestamp(x)))
df.set_index(keys=['date'], drop=True, inplace=True)
return df
[docs] def file_change_rates(self, branch='master', limit=None, extensions=None, ignore_dir=None, coverage=False):
"""
This function will return a DataFrame containing some basic aggregations of the file change history data, and
optionally test coverage data from a coverage.py .coverage file. The aim here is to identify files in the
project which have abnormal edit rates, or the rate of changes without growing the files size. If a file has
a high change rate and poor test coverage, then it is a great candidate for writing more tests.
:param branch: (optional, default=master) the branch to return commits for
:param limit: (optional, default=None) a maximum number of commits to return, None for no limit
:param extensions: (optional, default=None) a list of file extensions to return commits for
:param ignore_dir: (optional, default=None) a list of directory names to ignore
:param coverage: (optional, default=False) a bool for whether or not to attempt to join in coverage data.
:return: DataFrame
"""
fch = self.file_change_history(branch=branch, limit=limit, extensions=extensions, ignore_dir=ignore_dir)
fch.reset_index(level=0, inplace=True)
file_history = fch.groupby('filename').agg(
{
'insertions': [np.sum, np.max, np.mean],
'deletions': [np.sum, np.max, np.mean],
'message': lambda x: ','.join(['"' + str(y) + '"' for y in x]),
'committer': lambda x: ','.join(['"' + str(y) + '"' for y in x]),
'author': lambda x: ','.join(['"' + str(y) + '"' for y in x]),
'date': [np.max, np.min]
}
)
file_history.columns = [' '.join(col).strip() for col in file_history.columns.values]
file_history = file_history.rename(columns={
'message <lambda>': 'messages',
'committer <lambda>': 'committers',
'insertions sum': 'total_insertions',
'insertions amax': 'max_insertions',
'insertions mean': 'mean_insertions',
'author <lambda>': 'authors',
'date amax': 'max_date',
'date amin': 'min_date',
'deletions sum': 'total_deletions',
'deletions amax': 'max_deletions',
'deletions mean': 'mean_deletions'
})
# get some building block values for later use
file_history['net_change'] = file_history['total_insertions'] - file_history['total_deletions']
file_history['abs_change'] = file_history['total_insertions'] + file_history['total_deletions']
file_history['delta_time'] = file_history['max_date'] - file_history['min_date']
file_history['delta_days'] = file_history['delta_time'].map(lambda x: np.ceil(x.item() / (24 * 3600 * 10e9) + 0.01))
# calculate metrics
file_history['net_rate_of_change'] = file_history['net_change'] / file_history['delta_days']
file_history['abs_rate_of_change'] = file_history['abs_change'] / file_history['delta_days']
file_history['edit_rate'] = file_history['abs_rate_of_change'] - file_history['net_rate_of_change']
file_history['unique_committers'] = file_history['committers'].map(lambda x: len(set(x.split(','))))
# reindex
file_history = file_history.reindex(columns=['unique_committers', 'abs_rate_of_change', 'net_rate_of_change', 'net_change', 'abs_change', 'edit_rate'])
file_history.sort_values(by=['edit_rate'], inplace=True)
if coverage and self.has_coverage():
file_history = file_history.merge(self.coverage(), left_index=True, right_on='filename', how='outer')
file_history.set_index(keys=['filename'], drop=True, inplace=True)
return file_history
@staticmethod
def __check_extension(files, extensions, ignore_dir):
"""
Internal method to filter a list of file changes by extension and ignore_dirs.
:param files:
:param extensions: a list of file extensions to return commits for
:param ignore_dir: a list of directory names to ignore
:return: dict
"""
if extensions is None:
return files
if ignore_dir is None:
ignore_dir = []
else:
ignore_dir = [os.sep + str(x).replace('/', '').replace('\\', '') + os.sep for x in ignore_dir]
out = {}
for key in files.keys():
if key.split('.')[-1] in extensions:
if sum([1 if x in key else 0 for x in ignore_dir]) == 0:
out[key] = files[key]
return out
[docs] def blame(self, extensions=None, ignore_dir=None, rev='HEAD', committer=True, by='repository'):
"""
Returns the blame from the current HEAD of the repository as a DataFrame. The DataFrame is grouped by committer
name, so it will be the sum of all contributions to the repository by each committer. As with the commit history
method, extensions and ignore_dirs parameters can be passed to exclude certain directories, or focus on certain
file extensions. The DataFrame will have the columns:
* committer
* loc
:param extensions: (optional, default=None) a list of file extensions to return commits for
:param ignore_dir: (optional, default=None) a list of directory names to ignore
:param rev: (optional, default=HEAD) the specific revision to blame
:param committer: (optional, defualt=True) true if committer should be reported, false if author
:param by: (optional, default=repository) whether to group by repository or by file
:return: DataFrame
"""
if ignore_dir is None:
ignore_dir = []
blames = []
file_names = [x for x in self.repo.git.log(pretty='format:', name_only=True, diff_filter='A').split('\n') if x.strip() != '']
for file in file_names:
if sum([1 if x in file else 0 for x in ignore_dir]) == 0:
if extensions is not None:
if file.split('.')[-1] not in extensions:
continue
try:
blames.append([x + [str(file).replace(self.git_dir + '/', '')] for x in self.repo.blame(rev, str(file).replace(self.git_dir + '/', ''))])
except GitCommandError as err:
pass
blames = [item for sublist in blames for item in sublist]
if committer:
if by == 'repository':
blames = DataFrame([[x[0].committer.name, len(x[1])] for x in blames], columns=['committer', 'loc']).groupby('committer').agg({'loc': np.sum})
elif by == 'file':
blames = DataFrame([[x[0].committer.name, len(x[1]), x[2]] for x in blames], columns=['committer', 'loc', 'file']).groupby(['committer', 'file']).agg({'loc': np.sum})
else:
if by == 'repository':
blames = DataFrame([[x[0].author.name, len(x[1])] for x in blames], columns=['author', 'loc']).groupby('author').agg({'loc': np.sum})
elif by == 'file':
blames = DataFrame([[x[0].author.name, len(x[1]), x[2]] for x in blames], columns=['author', 'loc', 'file']).groupby(['author', 'file']).agg({'loc': np.sum})
return blames
[docs] def revs(self, branch='master', limit=None, skip=None, num_datapoints=None):
"""
Returns a dataframe of all revision tags and their timestamps. It will have the columns:
* date
* rev
:param branch: (optional, default 'master') the branch to work in
:param limit: (optional, default None), the maximum number of revisions to return, None for no limit
:param skip: (optional, default None), the number of revisions to skip. Ex: skip=2 returns every other revision, None for no skipping.
:param num_datapoints: (optional, default=None) if limit and skip are none, and this isn't, then num_datapoints evenly spaced revs will be used
:return: DataFrame
"""
if limit is None and skip is None and num_datapoints is not None:
limit = sum(1 for _ in self.repo.iter_commits())
skip = int(float(limit) / num_datapoints)
else:
if limit is None:
limit = sys.maxsize
elif skip is not None:
limit = limit * skip
ds = [[x.committed_date, x.name_rev.split(' ')[0]] for x in self.repo.iter_commits(branch, max_count=limit)]
df = DataFrame(ds, columns=['date', 'rev'])
if skip is not None:
if skip == 0:
skip = 1
if df.shape[0] >= skip:
df = df.ix[range(0, df.shape[0], skip)]
df.reset_index()
else:
df = df.ix[[0]]
df.reset_index()
return df
[docs] def cumulative_blame(self, branch='master', extensions=None, ignore_dir=None, limit=None, skip=None, num_datapoints=None, committer=True):
"""
Returns the blame at every revision of interest. Index is a datetime, column per committer, with number of lines
blamed to each committer at each timestamp as data.
:param branch: (optional, default 'master') the branch to work in
:param limit: (optional, default None), the maximum number of revisions to return, None for no limit
:param skip: (optional, default None), the number of revisions to skip. Ex: skip=2 returns every other revision, None for no skipping.
:param extensions: (optional, default=None) a list of file extensions to return commits for
:param ignore_dir: (optional, default=None) a list of directory names to ignore
:param num_datapoints: (optional, default=None) if limit and skip are none, and this isn't, then num_datapoints evenly spaced revs will be used
:param committer: (optional, defualt=True) true if committer should be reported, false if author
:return: DataFrame
"""
revs = self.revs(branch=branch, limit=limit, skip=skip, num_datapoints=num_datapoints)
# get the commit history to stub out committers (hacky and slow)
if sys.version_info.major == 2:
committers = set([x.committer.name for x in self.repo.iter_commits(branch, max_count=sys.maxsize)])
else:
committers = {x.committer.name for x in self.repo.iter_commits(branch, max_count=sys.maxsize)}
for committer in committers:
revs[committer] = 0
if self.verbose:
print('Beginning processing for cumulative blame:')
# now populate that table with some actual values
for idx, row in revs.iterrows():
if self.verbose:
print('%s. [%s] getting blame for rev: %s' % (str(idx), datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'), row.rev, ))
blame = self.blame(extensions=extensions, ignore_dir=ignore_dir, rev=row.rev, committer=committer)
for committer in committers:
try:
loc = blame.loc[committer, 'loc']
revs.set_value(idx, committer, loc)
except KeyError:
pass
del revs['rev']
revs['date'] = to_datetime(revs['date'].map(lambda x: datetime.datetime.fromtimestamp(x)))
revs.set_index(keys=['date'], drop=True, inplace=True)
revs = revs.fillna(0.0)
# drop 0 cols
for col in revs.columns.values:
if col != 'col':
if revs[col].sum() == 0:
del revs[col]
# drop 0 rows
keep_idx = []
committers = [x for x in revs.columns.values if x != 'date']
for idx, row in revs.iterrows():
if sum([row[x] for x in committers]) > 0:
keep_idx.append(idx)
revs = revs.ix[keep_idx]
return revs
[docs] def branches(self):
"""
Returns a data frame of all branches in origin. The DataFrame will have the columns:
* repository
* branch
* local
:returns: DataFrame
"""
# first pull the local branches
local_branches = self.repo.branches
data = [[x.name, True] for x in list(local_branches)]
# then the remotes
remote_branches = self.repo.git.branch(all=True).split('\n')
if sys.version_info.major == 2:
remote_branches = set([x.split('/')[-1] for x in remote_branches if 'remotes' in x])
else:
remote_branches = {x.split('/')[-1] for x in remote_branches if 'remotes' in x}
data += [[x, False] for x in remote_branches]
df = DataFrame(data, columns=['branch', 'local'])
df['repository'] = self._repo_name()
return df
def _repo_name(self):
"""
Returns the name of the repository, using the local directory name.
:returns: str
"""
if self._git_repo_name is not None:
return self._git_repo_name
else:
reponame = self.repo.git_dir.split(os.sep)[-2]
if reponame.strip() == '':
return 'unknown_repo'
return reponame
def __str__(self):
"""
A pretty name for the repository object.
:returns: str
"""
return 'git repository: %s at: %s' % (self._repo_name(), self.git_dir, )
def __repr__(self):
"""
A unique name for the repository object.
:returns: str
"""
return str(self.git_dir)
[docs] def bus_factor(self, by='repository', extensions=None, ignore_dir=None):
"""
An experimental heuristic for truck factor of a repository calculated by the current distribution of blame in
the repository's primary branch. The factor is the fewest number of contributors whose contributions make up at
least 50% of the codebase's LOC
:param extensions: (optional, default=None) a list of file extensions to return commits for
:param ignore_dir: (optional, default=None) a list of directory names to ignore
:param by: (optional, default=repository) whether to group by repository or by file
:return:
"""
if by == 'file':
raise NotImplementedError('File-wise bus factor')
blame = self.blame(extensions=extensions, ignore_dir=ignore_dir, by=by)
blame = blame.sort_values(by=['loc'], ascending=False)
total = blame['loc'].sum()
cumulative = 0
tc = 0
for idx in range(blame.shape[0]):
cumulative += blame.ix[idx, 'loc']
tc += 1
if cumulative >= total / 2:
break
return DataFrame([[self._repo_name(), tc]], columns=['repository', 'bus factor'])
[docs] def file_owner(self, rev, filename):
"""
"""
try:
blame = self.repo.blame(rev, os.path.join(self.git_dir, filename))
blame = DataFrame([[x[0].committer.name, len(x[1])] for x in blame], columns=['committer', 'loc']).groupby('committer').agg({'loc': np.sum})
if blame.shape[0] > 0:
return blame['loc'].idxmax()
else:
return None
except GitCommandError as e:
return None
[docs]class GitFlowRepository(Repository):
"""
A special case where git flow is followed, so we know something about the branching scheme
"""
def __init__(self):
super(Repository, self).__init__()