#!/usr/bin/python -tt import functools import os import os.path import pprint import tarfile import zipfile from collections import namedtuple import appeal # Replace with argparse after Proof of concept app = appeal.Appeal() # Option: Find new and changed files # Analyze any non-text files # Option: Analyze any non-python scripts # If we can't tell what they are, then ask reviewer to examine files # If we can tell that they are a certain thing, # then look further # # # End results: # Print summary of files with any information that we are able to extract about # them (for instance, open tarballs and look at the files inside of them) # # Print results of any analysis. # List them as json output on stdout. # Write the list to a file. # Consult the file for any files which have been checked. # Attempt to find out what the file is. # Check whether there is some way to analyze the length of the file? # If an archive, look inside of it. # See what other files reference the file. # If nothing references the file, flag that. # Flag the connection between one file and another # If a connection to a binary file is added, flag that for review. FileRecord = namedtuple("FileRecord", ("filename", "kind", "file_parts")) # # Categorizers # @functools.lru_cache def read_file(filename, encoding="utf-8"): """ Read a file's contents into memory. Done in a function so we can cache the contents but it's very naive. We may want to rearchitect this in the future. """ return open(filename, encoding=encoding).read() def is_zipfile(filename): """Determine whether this is a zipfile.""" return zipfile.is_zipfile(filename) def is_tarfile(filename): """Determine whether this is a tarfile.""" return tarfile.is_tarfile(filename) def is_binary(filename): """Call it binary if the file is decodable as utf-8 or has non-printable characters.""" try: contents = read_file(filename) except UnicodeError: # Non-utf-8 files are treated as binary return True # If a file contains non-printable characters, then it is treated as binary if not contents.isprintable(): return True # Note: the order of categorizers may be important. Things that are more # specific should be run first and less specific later. (For instance, tar file # before tar.gz before binary). CATEGORIZERS = (is_tarfile, is_zipfile, is_binary,) # # Analyzers # def tarfile_analyzer(file_record): # Open tarfile with transparent decompression with tarfile.open('r:*') as f: f.read() pass def zipfile_analyzer(file_record): pass def binary_analyzer(file_record): # Move over beethoven pass ANALYZERS = {"text": lambda x: False, "binary": binary_analyzer, "tarfile": tarfile_analyzer, "zipfile": zipfile_analyzer, } # # Main application logic # def detect_from_tree(root_of_tree): """ Scan all files in a directory tree and categorize them for analyzis. """ files_to_examine = [] for root_dir, directories, files in os.walk(root_of_tree): for filename in files: filepath = os.path.join(root_dir, filename) for categorizer in CATEGORIZERS: if kind := categorizer(filepath): files_to_examine.append(FileRecord(filepath, kind, "all")) break else: files_to_examine.append(FileRecord(filepath, "text", "all")) return files_to_examine def detect_from_repo(git_repo, base_rev, current_rev): files_to_examine = [] # Checkout git repo # Switch to branch/base_rev # Gather information about the difference between base_rev and current_rev # If binary, flag # If text, then show the code pass return files_to_examine def analyze(files_to_examine): results = {} for file_rec in files_to_examine: try: analyzer_func = ANALYZERS[file_rec.kind] except KeyError: # If there are no analyzers for this file type, we don't need to do # anything. continue results[file_rec.name] = analyzer_func(file_rec) pass def display_report(report_data, format="text"): pprint.pprint(report_data) pass # This is written like this because it will be ported to argparse in the future # and argparse will use this structure. def _main(root_of_tree=None, repo=None, base_rev=None, current_rev=None): if root_of_tree is not None: files_to_examine = detect_whole_tree(root_of_tree) elif git_repo is not None: files_to_examine = detect_from_repo(git_repo, base_rev, current_rev) else: # We shouldn't get here. Appeal should catch problems earlier. print("Need --root or --repo information to tell us what to scan.") return 255 report_data = analyze(files_to_examine) display_report(report_data) return 0 @app.command() def tree(*, root): canonical_root = os.path.realpath(os.path.expanduser(root)) if not os.path.exists(canonical_root): print(f"{canonical_root} (from {root}) must exist.") return 1 _main(root_of_tree=canonical_root) @app.command() def git_repo(*, repo, base_rev, current_rev="HEAD"): _main(repo=repo, base_rev=base_rev, current_rev=current_rev) pass if __name__ == '__main__': app.main()