PakistanDatabase.com

Python/Perl/Bash [FREE CODE] [Python] [PoC] SAST Low Hanging Fruit

Dexter

New member
Joined
Apr 30, 2023
Messages
21
Hellcoins
♆472
Username Style (Gradient Colours)
Hi folks,

This is my first collab. It's a modest Python script that :


  1. Downloads N GitHub repos based on a search term (you can filter by language, org, starts, etc.)
  2. Runs three SAST tools: Bandit, Pyright and Semgrep to look for critical vulnerabilities.
  3. If there are no vulns found, then we delete the repo, else we store the details in a .log file. Dead simple.

The idea is to find low hanging fruits based on a high-level criteria.
This is a PoC, so it's not parametrized, commented or well-structured. Sorry about that (I did it in like 30min, lol).

These are the reqs:
You must reply before you can see the hidden data contained here.
This is the code:
Code:
import json
import os
import shutil
import subprocess
from git import Repo
import time
import requests
from tqdm import tqdm

# Reference: https://github.com/settings/tokens
token = "ghp_xxxxxxxxxxxxxxxxxxxxxxxxxxx"

# Reference: https://docs.github.com/en/search-github/searching-on-github/searching-for-repositories
search_query = "forum language:python"

# Reference: https://semgrep.dev/explore
semgrep_command = [
    'semgrep',
    '--config',"p/gitleaks",
    '--config',"p/security-code-scan",
    '--config',"p/docker-compose",
    '--config',"p/insecure-transport",
    '--config',"p/r2c-security-audit",
    '--config',"p/flask",
    '--config',"p/django",
    '--config',"p/security-audit",
    '--config',"p/python",
    '--config',"p/default",
    '--config',"p/owasp-top-ten",
    '--config',"p/cwe-top-25",
    '--config',"p/jwt",
    '--config',"p/sql-injection",
    '--config',"p/secrets",
    '--config',"p/bandit",
    '--config',"p/brakeman",
    '--config',"p/findsecbugs",
    '--config',"p/flawfinder",
    '--config',"p/command-injection",
    '--json'
]


def search_github_repos(query, sort='stars', order='desc', per_page=100, max_pages=1):
    headers = {"Authorization": f"token {token}"}
    base_url = "https://api.github.com/search/repositories"
    repos = []

    page = 1
    while True:
        params = {
            "q": query,
            "sort": sort,
            "order": order,
            "per_page": per_page,
            "page": page
        }
        response = requests.get(base_url, headers=headers, params=params)
        response_data = response.json()
      
        if response.status_code != 200:
            print(f"Error: {response_data.get('message')}")
            break

        page_repos = response_data.get("items", [])

        if not page_repos or (max_pages is not None and page > max_pages):
            break

        print(f"Found {len(page_repos)} repositories on page {page}.")
        repos.extend(page_repos)
        page += 1

        time.sleep(0.5)

    print(f"Total repositories found: {len(repos)}")
    return repos

def run_bandit(repo_path):
    result = subprocess.run(['bandit', '-r', repo_path, '-f', 'json', '-l', 'HIGH', '-c', 'MEDIUM'], capture_output=True, text=True)
    issues = json.loads(result.stdout).get("results", []) if (result.returncode == 3) else []
    return issues

def run_pyright(repo_path):
    result = subprocess.run(['pyright', '--outputjson', repo_path], capture_output=True, text=True)
    issues = json.loads(result.stdout).get("diagnostics", [])
    high_severity_issues = [issue for issue in issues if issue.get("severity") == "warning"]
    return high_severity_issues

def run_semgrep(repo_path):
    semgrep_command.append(repo_path)
    result = subprocess.run(semgrep_command, capture_output=True, text=True)
  
    issues = json.loads(result.stdout).get("results", [])
    high_severity_issues = [issue for issue in issues if issue.get("severity") == "WARNING"]
    return high_severity_issues

def clone_and_analyze_repo(repo_name, repo_html_url, repo_clone_url):
    repo_path = os.path.join(os.getcwd(), repo_name)

    print(f"Cloning {repo_html_url} to {repo_path}...")
    Repo.clone_from(repo_clone_url, repo_path)

    print(f"Performing SAST analysis on {repo_name}...")
    bandit_issues = run_bandit(repo_path)
    pyright_issues = run_pyright(repo_path)
    semgrep_issues = run_semgrep(repo_path)

    all_issues = bandit_issues + pyright_issues + semgrep_issues

    if not all_issues:
        print("No high or critical vulnerabilities found. Deleting repo folder.")
        shutil.rmtree(repo_path)
    else:
        print("High or critical vulnerabilities found. Saving logs.")
        with open(os.path.join(repo_path, "bandit.log"), "w") as bandit_log, \
            open(os.path.join(repo_path, "pyright.log"), "w") as pyright_log, \
            open(os.path.join(repo_path, "semgrep.log"), "w") as semgrep_log:
            bandit_log.write(json.dumps({"results": bandit_issues}, indent=2))
            pyright_log.write(json.dumps({"diagnostics": pyright_issues}, indent=2))
            semgrep_log.write(json.dumps({"results": semgrep_issues}, indent=2))

    return all_issues

def main():
  
    print(f"Searching {search_query} repos...")
    repos = search_github_repos(search_query.replace(' ', '+'))
    print(" ")

    vuln_repos_count = 0
    for repo in tqdm(repos, desc="Processing repos"):
        repo_name = repo.get("name")
        repo_stars = repo.get("stargazers_count")
        repo_html_url = repo.get("html_url")
        repo_clone_url = repo.get("clone_url")

        print(f"Processing {repo_name} ({repo_stars} stars)...")
        has_vulns = clone_and_analyze_repo(repo_name, repo_html_url, repo_clone_url)
        if has_vulns:
            vuln_repos_count += 1
        print(" ")

    print(f"{vuln_repos_count} repos with vulns")
    print("Bye")

if __name__ == "__main__":
    main()
 
Top