Skip to content

Commit 4abbff5

Browse files
authored
PR #3: Add Python DSL API for Argument Matching (#386)
This introduces the Python DSL API surface for specifying argument constraints in security rules. Rule authors can now use match_name and match_position parameters in the calls matcher to define expected keyword and positional argument values. The DSL automatically generates the appropriate JSON IR structures that the Go executor uses for validation during code analysis.
1 parent a7b2db0 commit 4abbff5

File tree

2 files changed

+299
-9
lines changed

2 files changed

+299
-9
lines changed

python-dsl/codepathfinder/matchers.py

Lines changed: 84 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,36 @@
44
These matchers generate JSON IR for the Go executor.
55
"""
66

7+
from typing import Dict, Optional, Union, List, Any
78
from .ir import IRType
89

10+
ArgumentValue = Union[str, int, float, bool, List[Union[str, int, float, bool]]]
11+
912

1013
class CallMatcher:
1114
"""
12-
Matches function/method calls in the callgraph.
15+
Matches function/method calls with optional argument constraints.
1316
1417
Examples:
1518
calls("eval") # Exact match
1619
calls("eval", "exec") # Multiple patterns
1720
calls("request.*") # Wildcard (any request.* call)
1821
calls("*.json") # Wildcard (any *.json call)
22+
calls("app.run", match_name={"debug": True}) # Keyword argument matching
23+
calls("socket.bind", match_position={0: "0.0.0.0"}) # Positional argument matching
1924
"""
2025

21-
def __init__(self, *patterns: str):
26+
def __init__(
27+
self,
28+
*patterns: str,
29+
match_position: Optional[Dict[int, ArgumentValue]] = None,
30+
match_name: Optional[Dict[str, ArgumentValue]] = None,
31+
):
2232
"""
2333
Args:
2434
*patterns: Function names to match. Supports wildcards (*).
35+
match_position: Match positional arguments by index {position: value}
36+
match_name: Match named/keyword arguments {name: value}
2537
2638
Raises:
2739
ValueError: If no patterns provided or pattern is empty
@@ -34,6 +46,31 @@ def __init__(self, *patterns: str):
3446

3547
self.patterns = list(patterns)
3648
self.wildcard = any("*" in p for p in patterns)
49+
self.match_position = match_position or {}
50+
self.match_name = match_name or {}
51+
52+
def _make_constraint(self, value: ArgumentValue) -> Dict[str, Any]:
53+
"""
54+
Create an argument constraint from a value.
55+
56+
Automatically detects wildcard characters in string values.
57+
58+
Args:
59+
value: The argument value or list of values
60+
61+
Returns:
62+
Dictionary with 'value' and 'wildcard' keys
63+
"""
64+
# Check if wildcard characters are present in string values
65+
has_wildcard = False
66+
if isinstance(value, str) and ("*" in value or "?" in value):
67+
has_wildcard = True
68+
elif isinstance(value, list):
69+
has_wildcard = any(
70+
isinstance(v, str) and ("*" in v or "?" in v) for v in value
71+
)
72+
73+
return {"value": value, "wildcard": has_wildcard or self.wildcard}
3774

3875
def to_ir(self) -> dict:
3976
"""
@@ -44,16 +81,34 @@ def to_ir(self) -> dict:
4481
"type": "call_matcher",
4582
"patterns": ["eval", "exec"],
4683
"wildcard": false,
47-
"match_mode": "any" # matches if ANY pattern matches
84+
"matchMode": "any",
85+
"keywordArgs": { "debug": {"value": true, "wildcard": false} },
86+
"positionalArgs": { "0": {"value": "0.0.0.0", "wildcard": false} }
4887
}
4988
"""
50-
return {
89+
ir = {
5190
"type": IRType.CALL_MATCHER.value,
5291
"patterns": self.patterns,
5392
"wildcard": self.wildcard,
54-
"match_mode": "any",
93+
"matchMode": "any",
5594
}
5695

96+
# Add positional argument constraints
97+
if self.match_position:
98+
ir["positionalArgs"] = {
99+
str(pos): self._make_constraint(value)
100+
for pos, value in self.match_position.items()
101+
}
102+
103+
# Add keyword argument constraints
104+
if self.match_name:
105+
ir["keywordArgs"] = {
106+
name: self._make_constraint(value)
107+
for name, value in self.match_name.items()
108+
}
109+
110+
return ir
111+
57112
def __repr__(self) -> str:
58113
patterns_str = ", ".join(f'"{p}"' for p in self.patterns)
59114
return f"calls({patterns_str})"
@@ -105,12 +160,18 @@ def __repr__(self) -> str:
105160

106161

107162
# Public API
108-
def calls(*patterns: str) -> CallMatcher:
163+
def calls(
164+
*patterns: str,
165+
match_position: Optional[Dict[int, ArgumentValue]] = None,
166+
match_name: Optional[Dict[str, ArgumentValue]] = None,
167+
) -> CallMatcher:
109168
"""
110-
Create a matcher for function/method calls.
169+
Create a matcher for function/method calls with optional argument constraints.
111170
112171
Args:
113172
*patterns: Function names to match (supports wildcards)
173+
match_position: Match positional arguments by index {position: value}
174+
match_name: Match named/keyword arguments {name: value}
114175
115176
Returns:
116177
CallMatcher instance
@@ -124,8 +185,23 @@ def calls(*patterns: str) -> CallMatcher:
124185
125186
>>> calls("urllib.*")
126187
calls("urllib.*")
188+
189+
>>> calls("app.run", match_name={"debug": True})
190+
calls("app.run")
191+
192+
>>> calls("socket.bind", match_position={0: "0.0.0.0"})
193+
calls("socket.bind")
194+
195+
>>> calls("yaml.load", match_position={1: ["Loader", "UnsafeLoader"]})
196+
calls("yaml.load")
197+
198+
>>> calls("chmod", match_position={1: "0o7*"})
199+
calls("chmod")
200+
201+
>>> calls("app.run", match_position={0: "localhost"}, match_name={"debug": True})
202+
calls("app.run")
127203
"""
128-
return CallMatcher(*patterns)
204+
return CallMatcher(*patterns, match_position=match_position, match_name=match_name)
129205

130206

131207
def variable(pattern: str) -> VariableMatcher:

python-dsl/tests/test_matchers.py

Lines changed: 215 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def test_to_ir(self):
5555
assert ir["type"] == "call_matcher"
5656
assert ir["patterns"] == ["eval", "exec"]
5757
assert ir["wildcard"] is False
58-
assert ir["match_mode"] == "any"
58+
assert ir["matchMode"] == "any"
5959

6060
def test_to_ir_wildcard(self):
6161
"""Test CallMatcher.to_ir() with wildcard."""
@@ -126,3 +126,217 @@ def test_repr(self):
126126
"""Test VariableMatcher.__repr__() output."""
127127
matcher = variable("user_input")
128128
assert repr(matcher) == 'variable("user_input")'
129+
130+
131+
class TestCallMatcherKeywordArguments:
132+
"""Test suite for keyword argument matching (match_name)."""
133+
134+
def test_single_keyword_arg_string(self):
135+
"""Test matching single keyword argument with string value."""
136+
matcher = calls("app.run", match_name={"debug": "True"})
137+
ir = matcher.to_ir()
138+
139+
assert "keywordArgs" in ir
140+
assert "debug" in ir["keywordArgs"]
141+
assert ir["keywordArgs"]["debug"]["value"] == "True"
142+
assert ir["keywordArgs"]["debug"]["wildcard"] is False
143+
144+
def test_single_keyword_arg_boolean(self):
145+
"""Test matching keyword argument with boolean value."""
146+
matcher = calls("app.run", match_name={"debug": True})
147+
ir = matcher.to_ir()
148+
149+
assert ir["keywordArgs"]["debug"]["value"] is True
150+
151+
def test_single_keyword_arg_number(self):
152+
"""Test matching keyword argument with numeric value."""
153+
matcher = calls("app.listen", match_name={"port": 8080})
154+
ir = matcher.to_ir()
155+
156+
assert ir["keywordArgs"]["port"]["value"] == 8080
157+
158+
def test_multiple_keyword_args(self):
159+
"""Test matching multiple keyword arguments."""
160+
matcher = calls(
161+
"app.run", match_name={"host": "0.0.0.0", "port": 5000, "debug": True}
162+
)
163+
ir = matcher.to_ir()
164+
165+
assert len(ir["keywordArgs"]) == 3
166+
assert ir["keywordArgs"]["host"]["value"] == "0.0.0.0"
167+
assert ir["keywordArgs"]["port"]["value"] == 5000
168+
assert ir["keywordArgs"]["debug"]["value"] is True
169+
170+
def test_keyword_arg_or_logic(self):
171+
"""Test matching keyword argument with multiple values (OR logic)."""
172+
matcher = calls(
173+
"yaml.load", match_name={"Loader": ["Loader", "UnsafeLoader", "FullLoader"]}
174+
)
175+
ir = matcher.to_ir()
176+
177+
assert isinstance(ir["keywordArgs"]["Loader"]["value"], list)
178+
assert len(ir["keywordArgs"]["Loader"]["value"]) == 3
179+
180+
181+
class TestCallMatcherPositionalArguments:
182+
"""Test suite for positional argument matching (match_position)."""
183+
184+
def test_single_positional_arg(self):
185+
"""Test matching single positional argument."""
186+
matcher = calls("socket.bind", match_position={0: "0.0.0.0"})
187+
ir = matcher.to_ir()
188+
189+
assert "positionalArgs" in ir
190+
assert "0" in ir["positionalArgs"] # JSON keys are strings
191+
assert ir["positionalArgs"]["0"]["value"] == "0.0.0.0"
192+
193+
def test_multiple_positional_args(self):
194+
"""Test matching multiple positional arguments."""
195+
matcher = calls("chmod", match_position={0: "/tmp/file", 1: 0o777})
196+
ir = matcher.to_ir()
197+
198+
assert len(ir["positionalArgs"]) == 2
199+
assert ir["positionalArgs"]["0"]["value"] == "/tmp/file"
200+
assert ir["positionalArgs"]["1"]["value"] == 0o777
201+
202+
def test_positional_arg_or_logic(self):
203+
"""Test matching positional argument with multiple values."""
204+
matcher = calls("open", match_position={1: ["w", "a", "w+", "a+"]})
205+
ir = matcher.to_ir()
206+
207+
assert isinstance(ir["positionalArgs"]["1"]["value"], list)
208+
assert len(ir["positionalArgs"]["1"]["value"]) == 4
209+
210+
211+
class TestCallMatcherCombinedArguments:
212+
"""Test suite for combined positional and keyword argument matching."""
213+
214+
def test_both_positional_and_keyword(self):
215+
"""Test matching both positional and keyword arguments."""
216+
matcher = calls(
217+
"app.run",
218+
match_position={0: "localhost"},
219+
match_name={"debug": True, "port": 5000},
220+
)
221+
ir = matcher.to_ir()
222+
223+
assert "positionalArgs" in ir
224+
assert "keywordArgs" in ir
225+
assert ir["positionalArgs"]["0"]["value"] == "localhost"
226+
assert ir["keywordArgs"]["debug"]["value"] is True
227+
assert ir["keywordArgs"]["port"]["value"] == 5000
228+
229+
230+
class TestCallMatcherWildcardMatching:
231+
"""Test suite for wildcard matching in argument values."""
232+
233+
def test_wildcard_in_string_value(self):
234+
"""Test automatic wildcard detection in string values."""
235+
matcher = calls("chmod", match_position={1: "0o7*"})
236+
ir = matcher.to_ir()
237+
238+
# Wildcard should be auto-detected from '*' in value
239+
assert ir["positionalArgs"]["1"]["wildcard"] is True
240+
241+
def test_wildcard_in_list_value(self):
242+
"""Test wildcard detection in list of values."""
243+
matcher = calls("open", match_position={1: ["w*", "a*"]})
244+
ir = matcher.to_ir()
245+
246+
assert ir["positionalArgs"]["1"]["wildcard"] is True
247+
248+
def test_explicit_wildcard_flag(self):
249+
"""Test explicit wildcard flag propagation."""
250+
matcher = calls("app.*", match_name={"host": "192.168.1.1"})
251+
ir = matcher.to_ir()
252+
253+
assert ir["wildcard"] is True
254+
# Wildcard in function pattern propagates to argument constraints
255+
assert ir["keywordArgs"]["host"]["wildcard"] is True
256+
257+
258+
class TestCallMatcherBackwardCompatibility:
259+
"""Test suite for backward compatibility with existing rules."""
260+
261+
def test_no_arguments_specified(self):
262+
"""Test that rules without argument constraints still work."""
263+
matcher = calls("eval")
264+
ir = matcher.to_ir()
265+
266+
# Should not have argument constraint fields
267+
assert "positionalArgs" not in ir
268+
assert "keywordArgs" not in ir
269+
270+
def test_empty_argument_dicts(self):
271+
"""Test that empty argument dicts don't add IR fields."""
272+
matcher = calls("eval", match_position={}, match_name={})
273+
ir = matcher.to_ir()
274+
275+
# Empty dicts should not add fields
276+
assert "positionalArgs" not in ir
277+
assert "keywordArgs" not in ir
278+
279+
280+
class TestCallMatcherIRSerialization:
281+
"""Test suite for JSON serialization of generated IR."""
282+
283+
def test_complex_ir_serialization(self):
284+
"""Test that complex IR can be serialized to JSON."""
285+
import json
286+
287+
matcher = calls(
288+
"app.run",
289+
match_position={0: "0.0.0.0"},
290+
match_name={"debug": True, "port": 5000, "host": ["localhost", "0.0.0.0"]},
291+
)
292+
ir = matcher.to_ir()
293+
294+
# Should be JSON-serializable
295+
json_str = json.dumps(ir)
296+
reconstructed = json.loads(json_str)
297+
298+
assert reconstructed["type"] == "call_matcher"
299+
assert reconstructed["keywordArgs"]["debug"]["value"] is True
300+
301+
def test_special_values_serialization(self):
302+
"""Test serialization of special Python values."""
303+
import json
304+
305+
matcher = calls("chmod", match_position={1: 0o777})
306+
ir = matcher.to_ir()
307+
308+
json_str = json.dumps(ir)
309+
reconstructed = json.loads(json_str)
310+
311+
# Octal should be serialized as decimal integer
312+
assert reconstructed["positionalArgs"]["1"]["value"] == 511
313+
314+
315+
class TestCallMatcherEdgeCases:
316+
"""Test suite for edge cases and error conditions."""
317+
318+
def test_none_values_handled(self):
319+
"""Test that None match_name/match_position are handled."""
320+
matcher = calls("eval", match_name=None, match_position=None)
321+
ir = matcher.to_ir()
322+
323+
assert "keywordArgs" not in ir
324+
assert "positionalArgs" not in ir
325+
326+
def test_mixed_value_types(self):
327+
"""Test mixing different value types in same rule."""
328+
matcher = calls(
329+
"config.set",
330+
match_name={
331+
"timeout": 30, # int
332+
"enabled": True, # bool
333+
"host": "localhost", # string
334+
"retry": 5.5, # float
335+
},
336+
)
337+
ir = matcher.to_ir()
338+
339+
assert ir["keywordArgs"]["timeout"]["value"] == 30
340+
assert ir["keywordArgs"]["enabled"]["value"] is True
341+
assert ir["keywordArgs"]["host"]["value"] == "localhost"
342+
assert ir["keywordArgs"]["retry"]["value"] == 5.5

0 commit comments

Comments
 (0)