Build Magika + GPT File Security Pipeline
Use Google's Magika for byte-accurate file typing and GPT-4o to generate security insights, risk scores, and reports from scan results in a Python workflow.
Initialize Magika and OpenAI for Byte-Level Detection
This masterclass teaches how to create a robust file analysis pipeline by combining Magika—a deep learning model from Google that identifies over 500 file types from raw bytes, ignoring extensions—with OpenAI's GPT-4o for contextual interpretation. Prerequisites: Basic Python, familiarity with APIs, and an OpenAI key. Start by installing pip install magika openai -q, then securely input your API key:
import getpass
from openai import OpenAI
from magika import Magika
api_key = getpass.getpass("OpenAI API Key: ")
client = OpenAI(api_key=api_key)
m = Magika()
Test connectivity: client.models.list() and check Magika with m.get_model_name(). Define a prompt helper for GPT analysis:
def ask_gpt(system: str, user: str, model: "gpt-4o", max_tokens: int = 600) -> str:
resp = client.chat.completions.create(
model=model, max_tokens=max_tokens,
messages=[{"role": "system", "content": system}, {"role": "user", "content": user}]
)
return resp.choices[0].message.content.strip()
Principle: Magika processes bytes directly (m.identify_bytes(raw_bytes) or m.identify_paths(paths)), returning MagikaResult with output.label, output.mime_type, score, output.group, and raw dl.label. Use output.* fields for production (post-thresholding); dl.* for debugging. Common mistake: Relying on extensions—spoofing bypasses them. GPT translates: e.g., prompt for explanation of byte patterns like shebangs (#!/) or magic bytes (%PDF).
For single files, scan bytes:
res = m.identify_bytes(b"#!/usr/bin/env python3\n")
print(res.output.label) # 'python'
print(res.score) # e.g., 0.99
Batch scan directories: results = m.identify_paths([Path('file1'), Path('file2')]). Quality criteria: Scores >90% for high confidence; inspect output.is_text for extractability.
"💬 GPT on how Magika works: Magika uses a deep neural network trained on millions of file bytes to recognize patterns like magic numbers, headers, and structural signatures that uniquely identify file formats, regardless of extensions. This outperforms extension checks because attackers often spoof extensions to hide malware, but byte-level analysis reveals the true format."
Tune Detection for Edge Cases and Threats
Configure Magika(prediction_mode=PredictionMode.HIGH_CONFIDENCE) for conservative scans (blocks low-score ambiguities), MEDIUM_CONFIDENCE for balanced, or BEST_GUESS for exploratory. Test on ambiguous text like b"Hello, world.": High may abstain, Best Guess labels 'text'. Principle: Match mode to risk—HIGH_CONFIDENCE for uploads, BEST_GUESS for forensics. Avoid mistake: Default mode on binaries; always probe prefixes (Magika works from 4-512 bytes via early patterns).
Detect spoofing: Compare output.label vs. expected from extension:
ext = fname.rsplit(".", 1)[-1]
expected = {"pdf": "pdf", "jpg": "jpeg"}.get(ext)
match = res.output.label == expected
threats = [fname if not match else None]
Corpus analysis: Scan mixed bytes, tally Counter(r.output.group) for repo insights (e.g., 40% code, 30% config signals web app). Trade-off: Magika excels on known types but may mislabel novel hybrids; cross-check with output.description.
"💬 GPT on when to use each mode: - HIGH_CONFIDENCE: File uploads in production to minimize false positives on potential malware. - MEDIUM_CONFIDENCE: Code reviews where some ambiguity is tolerable for broader coverage. - BEST_GUESS: Forensics or exploratory scans to get a starting hypothesis even on noisy data."
Deploy Upload Scanner and Forensic Pipeline
Simulate uploads: Create temp dir, write files, batch-scan, apply rules:
BLOCKED_LABELS = {"pe", "elf", "macho"} # Binaries
status = "🚫 BLOCKED" if o.label in BLOCKED_LABELS else "✅ OK" if not mismatch else "⚠️ MISMATCH"
Flag mismatches (e.g., .pdf hiding shell), block executables. For forensics, compute hashlib.sha256(content).hexdigest()[:16], log label, mime_type, is_text. Fit in workflow: Integrate as middleware (e.g., FastAPI @app.post('/upload') calls m.identify_paths). Scale with async batches; monitor scores <0.8.
GPT risk scoring: Feed json.dumps(scan_results) for structured output:
risk_report = ask_gpt("You are a senior security analyst.", f"Results: {json.dumps(scan_results)}. Provide risk summary.")
Quality check: Good pipeline blocks 100% known bad, flags 90% spoofs, reports in JSON.
"💬 GPT threat assessment: For invoice.pdf (shell script): Likely script kiddie dropper; quarantine and static-analysis with VirusTotal. photo.jpg (html): XSS vector via image handler flaw; block HTML in image paths. data.csv (zip): Archive bomb or hidden payload; decompress safely in sandbox. readme.txt (pdf): Polyglot exploit attempt; full byte-scan all 'docs'."
Generate Actionable Reports and Narratives
Structure JSON reports:
report = [{
"filename": name,
"label": o.label,
"mime_type": o.mime_type,
"score": round(res.score, 4),
# ... full MagikaResult fields
} for each file]
with open("/tmp/report.json", "w") as f:
json.dump({"scan_results": report, "exec_summary": exec_summary}, f)
Prompt GPT for audiences: DevSecOps summaries (3 sentences), CISO exec (2 paras), IOC narratives (attack chain). Principle: Always include raw results + interpreted insights; version with Magika 1.0.2 fixes (e.g., res.score unified). Practice: Fork the Colab notebook, test your uploads.
"💬 GPT executive summary: The scan identified mostly legitimate code and config files for a Python web app, but flagged an executable (evil.exe) and spoofed PDF hiding Python code, elevating overall risk to medium. No immediate breaches, but binaries indicate potential supply-chain compromise. Next: Implement auto-quarantine for mismatches, run full AV on blocked files, and audit upload handlers for extension bypasses."
Key Takeaways
- Install Magika/OpenAI, test with
identify_bytes(raw)for extension-proof typing. - Use prediction modes: HIGH_CONFIDENCE for prod uploads, BEST_GUESS for forensics.
- Detect spoofs by comparing label vs. extension map; block {'pe','elf','macho'}.
- Batch-scan dirs, tally groups/labels for repo profiling.
- Prompt GPT with
json.dumps(results)for tailored insights: risks, IOCs, exec summaries. - Export JSON with full fields (output.* prioritized); probe prefixes for perf.
- Avoid: Extension reliance, unprompted GPT (always system-role context).
- Scale: Temp dirs for uploads, SHA prefixes for IOCs.
- Debug:
dl.labelvs.output.labelshows thresholding. - Practice: Run on your codebase, build FastAPI endpoint.