Skip to content

Commit 21a1b95

Browse files
committed
fix: harden custom SAST rule selection + filtering behavior
Signed-off-by: lelia <2418071+lelia@users.noreply.github.com>
1 parent 047aedf commit 21a1b95

2 files changed

Lines changed: 220 additions & 3 deletions

File tree

socket_basics/core/connector/opengrep/__init__.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ def scan(self) -> Dict[str, Any]:
4040
rule_files = self.config.build_opengrep_rules() or []
4141
except Exception:
4242
rule_files = []
43+
logger.info(
44+
"OpenGrep config summary: all_languages_enabled=%s all_rules_enabled=%s requested_rule_files=%s",
45+
bool(self.config.get('all_languages_enabled', False)),
46+
bool(self.config.get('all_rules_enabled', False)),
47+
rule_files,
48+
)
4349

4450
# If no languages selected and not explicitly allowing all, skip
4551
if not rule_files and not self.config.get('all_languages_enabled', False):
@@ -51,6 +57,12 @@ def scan(self) -> Dict[str, Any]:
5157
# Check if custom rules mode is enabled
5258
custom_rules_path = self.config.get_custom_rules_path()
5359
custom_rule_files: Dict[str, Path] = {}
60+
logger.info(
61+
"Custom SAST requested=%s custom_path=%s resolved_path=%s",
62+
bool(self.config.get('use_custom_sast_rules', False)),
63+
self.config.get('custom_sast_rule_path', ''),
64+
str(custom_rules_path) if custom_rules_path else '(none)',
65+
)
5466

5567
if custom_rules_path:
5668
logger.info(f"Custom SAST rules enabled, loading from: {custom_rules_path}")
@@ -74,6 +86,11 @@ def scan(self) -> Dict[str, Any]:
7486
filtered = self.config.build_filtered_opengrep_rules() or {}
7587
except Exception:
7688
filtered = {}
89+
if filtered:
90+
filtered_counts = {k: len(v or []) for k, v in filtered.items()}
91+
logger.info("Per-language enabled-rule filters detected: %s", filtered_counts)
92+
else:
93+
logger.info("Per-language enabled-rule filters disabled for this run")
7794

7895
# Debugging: log computed rule files and filtered rules for diagnosis
7996
try:
@@ -91,25 +108,42 @@ def scan(self) -> Dict[str, Any]:
91108
# Process all enabled languages - use filtered rules if specified, otherwise use all rules
92109
for rf in rule_files:
93110
# Check if we have a custom rule file for this language
111+
using_custom_rules = bool(custom_rule_files and rf in custom_rule_files)
94112
if custom_rule_files and rf in custom_rule_files:
95113
p = custom_rule_files[rf]
96-
logger.info(f"Using custom rules for {rf}")
114+
logger.info("Using custom rules for %s from %s", rf, p)
97115
else:
98116
# Fall back to bundled rules
99117
p = Path(rules_dir) / rf
100118
if not p.exists():
101119
logger.debug('Rule file missing: %s', p)
102120
continue
121+
logger.info("Using bundled rules for %s from %s", rf, p)
103122

104123
# Check if this language has specific rules enabled (filtered mode)
105124
if filtered and rf in filtered:
106125
enabled_ids = filtered[rf]
107-
logger.debug(f"Using filtered rules for {rf}: {len(enabled_ids)} rules enabled")
126+
logger.info("Filtering rules for %s: %d enabled IDs configured", rf, len(enabled_ids))
108127
try:
109128
with open(p, 'r') as fh:
110129
data = yaml.safe_load(fh) or {}
111130
all_ids = [r.get('id') for r in (data.get('rules') or []) if r.get('id')]
112-
to_exclude = [rid for rid in all_ids if rid not in (enabled_ids or [])]
131+
# Custom-rule mode can coexist with legacy bundled allowlists.
132+
# If none of the configured enabled IDs match custom IDs, keep all
133+
# custom IDs active to avoid silently disabling user-authored rules.
134+
if using_custom_rules:
135+
matched_enabled_ids = [rid for rid in all_ids if rid in (enabled_ids or [])]
136+
if enabled_ids and not matched_enabled_ids:
137+
logger.warning(
138+
"No configured enabled-rule IDs matched custom rules for %s; using all custom rules from %s",
139+
rf,
140+
p,
141+
)
142+
config_args.extend(['--config', str(p)])
143+
continue
144+
to_exclude = [rid for rid in all_ids if rid not in matched_enabled_ids]
145+
else:
146+
to_exclude = [rid for rid in all_ids if rid not in (enabled_ids or [])]
113147
config_args.extend(['--config', str(p)])
114148
for ex in to_exclude:
115149
config_args.extend(['--exclude-rule', ex])
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
import json
2+
from pathlib import Path
3+
from types import SimpleNamespace
4+
5+
from socket_basics.core.config import Config
6+
from socket_basics.core.connector.opengrep import OpenGrepScanner
7+
8+
9+
def _write_rule_file(path: Path, rule_ids: list[str]) -> None:
10+
rules = [{"id": rid, "languages": ["javascript"], "pattern": "eval(...)"} for rid in rule_ids]
11+
path.parent.mkdir(parents=True, exist_ok=True)
12+
path.write_text(json.dumps({"rules": rules}), encoding="utf-8")
13+
14+
15+
def _write_custom_rules_file(path: Path, rule_ids: list[str]) -> None:
16+
lines = ["rules:"]
17+
for rid in rule_ids:
18+
lines.extend(
19+
[
20+
f" - id: {rid}",
21+
" pattern: eval(...)",
22+
" languages: [javascript, typescript]",
23+
f' message: Rule {rid}',
24+
" severity: ERROR",
25+
]
26+
)
27+
path.parent.mkdir(parents=True, exist_ok=True)
28+
path.write_text("\n".join(lines), encoding="utf-8")
29+
30+
31+
def _mock_subprocess_run(monkeypatch, captured_cmd: list[str]):
32+
def _runner(cmd, capture_output, text):
33+
captured_cmd.extend(cmd)
34+
out_file = cmd[cmd.index("--output") + 1]
35+
Path(out_file).write_text(json.dumps({"results": []}), encoding="utf-8")
36+
return SimpleNamespace(stdout="", stderr="", returncode=0)
37+
38+
monkeypatch.setattr("socket_basics.core.connector.opengrep.subprocess.run", _runner)
39+
40+
41+
def test_scan_uses_custom_rule_file_when_available(tmp_path, monkeypatch):
42+
workspace = tmp_path / "workspace"
43+
workspace.mkdir(parents=True, exist_ok=True)
44+
45+
custom_rules_dir = workspace / ".socket" / "rules"
46+
# Custom file can be any yaml name; builder groups by languages.
47+
custom_rules_file = custom_rules_dir / "org-rules.yml"
48+
_write_custom_rules_file(custom_rules_file, ["org.no-eval"])
49+
50+
bundled_rules_dir = tmp_path / "bundled-rules"
51+
_write_rule_file(bundled_rules_dir / "javascript_typescript.yml", ["js-default-rule"])
52+
53+
config = Config(
54+
{
55+
"workspace": str(workspace),
56+
"output_dir": str(workspace),
57+
"javascript_sast_enabled": True,
58+
"use_custom_sast_rules": True,
59+
"custom_sast_rule_path": ".socket/rules",
60+
"opengrep_rules_dir": str(bundled_rules_dir),
61+
"all_languages_enabled": False,
62+
"all_rules_enabled": False,
63+
"verbose": False,
64+
}
65+
)
66+
scanner = OpenGrepScanner(config)
67+
scanner._convert_to_socket_facts = lambda _: {"components": []}
68+
scanner.generate_notifications = lambda _: {}
69+
70+
captured_cmd: list[str] = []
71+
_mock_subprocess_run(monkeypatch, captured_cmd)
72+
scanner.scan()
73+
74+
cmd_str = " ".join(captured_cmd)
75+
assert "socket_custom_rules_" in cmd_str
76+
assert str(bundled_rules_dir / "javascript_typescript.yml") not in cmd_str
77+
78+
79+
def test_scan_falls_back_to_bundled_file_when_custom_missing(tmp_path, monkeypatch):
80+
workspace = tmp_path / "workspace"
81+
workspace.mkdir(parents=True, exist_ok=True)
82+
83+
bundled_rules_dir = tmp_path / "bundled-rules"
84+
bundled_file = bundled_rules_dir / "javascript_typescript.yml"
85+
_write_rule_file(bundled_file, ["js-default-rule"])
86+
87+
config = Config(
88+
{
89+
"workspace": str(workspace),
90+
"output_dir": str(workspace),
91+
"javascript_sast_enabled": True,
92+
"use_custom_sast_rules": True,
93+
"custom_sast_rule_path": ".socket/missing-rules",
94+
"opengrep_rules_dir": str(bundled_rules_dir),
95+
"all_languages_enabled": False,
96+
"all_rules_enabled": False,
97+
"verbose": False,
98+
}
99+
)
100+
scanner = OpenGrepScanner(config)
101+
scanner._convert_to_socket_facts = lambda _: {"components": []}
102+
scanner.generate_notifications = lambda _: {}
103+
104+
captured_cmd: list[str] = []
105+
_mock_subprocess_run(monkeypatch, captured_cmd)
106+
scanner.scan()
107+
108+
assert str(bundled_file) in " ".join(captured_cmd)
109+
110+
111+
def test_custom_rules_ignore_nonmatching_bundled_allowlist_ids(tmp_path, monkeypatch):
112+
workspace = tmp_path / "workspace"
113+
workspace.mkdir(parents=True, exist_ok=True)
114+
115+
custom_rules_file = workspace / ".socket" / "rules" / "org-rules.yml"
116+
_write_custom_rules_file(custom_rules_file, ["org.no-eval", "org.no-innerhtml"])
117+
118+
bundled_rules_dir = tmp_path / "bundled-rules"
119+
_write_rule_file(bundled_rules_dir / "javascript_typescript.yml", ["js-default-rule"])
120+
121+
config = Config(
122+
{
123+
"workspace": str(workspace),
124+
"output_dir": str(workspace),
125+
"javascript_sast_enabled": True,
126+
"javascript_enabled_rules": "js-default-rule",
127+
"use_custom_sast_rules": True,
128+
"custom_sast_rule_path": ".socket/rules",
129+
"opengrep_rules_dir": str(bundled_rules_dir),
130+
"all_languages_enabled": False,
131+
"all_rules_enabled": False,
132+
"verbose": False,
133+
}
134+
)
135+
scanner = OpenGrepScanner(config)
136+
scanner._convert_to_socket_facts = lambda _: {"components": []}
137+
scanner.generate_notifications = lambda _: {}
138+
139+
captured_cmd: list[str] = []
140+
_mock_subprocess_run(monkeypatch, captured_cmd)
141+
scanner.scan()
142+
143+
cmd_str = " ".join(captured_cmd)
144+
assert "socket_custom_rules_" in cmd_str
145+
assert "--exclude-rule org.no-eval" not in cmd_str
146+
assert "--exclude-rule org.no-innerhtml" not in cmd_str
147+
148+
149+
def test_custom_rules_apply_allowlist_when_custom_ids_match(tmp_path, monkeypatch):
150+
workspace = tmp_path / "workspace"
151+
workspace.mkdir(parents=True, exist_ok=True)
152+
153+
custom_rules_file = workspace / ".socket" / "rules" / "org-rules.yml"
154+
_write_custom_rules_file(custom_rules_file, ["org.no-eval", "org.no-innerhtml"])
155+
156+
bundled_rules_dir = tmp_path / "bundled-rules"
157+
_write_rule_file(bundled_rules_dir / "javascript_typescript.yml", ["js-default-rule"])
158+
159+
config = Config(
160+
{
161+
"workspace": str(workspace),
162+
"output_dir": str(workspace),
163+
"javascript_sast_enabled": True,
164+
"javascript_enabled_rules": "org.no-eval",
165+
"use_custom_sast_rules": True,
166+
"custom_sast_rule_path": ".socket/rules",
167+
"opengrep_rules_dir": str(bundled_rules_dir),
168+
"all_languages_enabled": False,
169+
"all_rules_enabled": False,
170+
"verbose": False,
171+
}
172+
)
173+
scanner = OpenGrepScanner(config)
174+
scanner._convert_to_socket_facts = lambda _: {"components": []}
175+
scanner.generate_notifications = lambda _: {}
176+
177+
captured_cmd: list[str] = []
178+
_mock_subprocess_run(monkeypatch, captured_cmd)
179+
scanner.scan()
180+
181+
cmd_str = " ".join(captured_cmd)
182+
assert "--exclude-rule org.no-innerhtml" in cmd_str
183+
assert "--exclude-rule org.no-eval" not in cmd_str

0 commit comments

Comments
 (0)