Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

[Python] Tracking only the most updated data flow #20656

Answered by owen-mc
elManto asked this question in Q&A
Discussion options

Hi Everyone,

Probably a silly question, but I couldn't find much around. I need to track data flow in Python scripts. The goal is to track the expressions that flow into an open() call. For now I'm using this sample query:

module FilesystemTaint implements DataFlow::ConfigSig {
 predicate isSource(DataFlow::Node source) {
 exists( Expr e | source.asExpr() = e)
 }
 predicate isSink(DataFlow::Node sink) {
 exists (DataFlow::CallCfgNode call, FileSystemAccess fs | 
 sink = call.getArg(0) and call.getArg(0) = fs.getAPathArgument())
 }
}
module MyFlow = TaintTracking::Global<FilesystemTaint>;
from DataFlow::Node source, DataFlow::Node sink
where MyFlow::flow(source, sink) 
select source, sink, source.getLocation(), sink.getLocation()

And this sample testcase:

def get_input_from_stdin():
 return sys.argv[1]
def get_hardcoded_filename():
 return "output.txt"
def combine_input_and_hardcoded():
 user_input = get_input_from_stdin()
 return f"{user_input}_log.txt"
def intermediate_variable():
 part1 = get_input_from_stdin()
 part2 = "_data"
 filename = part1 + part2 + ".txt"
 return filename
def filename_from_multiple_functions():
 prefix = get_input_from_stdin()
 suffix = get_hardcoded_filename()
 return prefix + "_" + suffix
def nested_function_call():
 def inner():
 return get_input_from_stdin()
 filename = inner() + "_nested.txt"
 return filename
def custom_open(filepath):
 with open(filepath, 'w') as f:
 f.write("This is a test file.\n")
def main():
 custom_open(get_hardcoded_filename())
 custom_open(combine_input_and_hardcoded())
 custom_open(intermediate_variable())
 custom_open(filename_from_multiple_functions())
 custom_open(nested_function_call())
if __name__ == "__main__":
 main()

As expected CodeQL returns me one path for each source Expr . However, I would like to keep only the most updated one. For instance, for the function intermediate_variable() , the result will report that all the expressions flow into the open() API, namely, part1, part2, get_input_from_stdin(), part1 + part2 and the literals . However, I would like to keep only the most updated source, which is filename .

Obviously I could solve this by checking the line number but that's no going to fly for slightly more complex scenarios. Another solution I was thinking is to have an other DataFlow tracking to check intra-procedural flows among local expressions but before doing that I'm curious if there's a more "CodeQL"-like way to do this. Thanks.

You must be logged in to vote

Another option is to use DataFlow::Global instead of TaintTracking::Global. Then you will only get expressions that actually flow to the sink, without being modified in some way first.

Replies: 2 comments 3 replies

Comment options

👋 @elManto

As far as I can tell, what you are seeing is a result of how you defined isSource, where you marked every expression as a source. This means that CodeQL will show you all paths from any expression to a file system access, which in the intermediate_variable function does indeed mean all of the expressions in that function.

Once you restrict isSource to what you really want to mark as a source of tainted data, you will get more sensible results. In taint flow analysis, there is no such thing as a "most updated source": there are original sources, sinks, and nodes on the paths from the former to the latter. In this terms filename won't probably be a source, but rather a node on the path from the source to the sink.

For example, if you restrict isSource to sys.argv elements (provided you're using global taint flow which I think you already are), then you'll get one path through intermediate_variable, roughly consisting in

sys.argv[1] -> return -> get_input_from_stdin() ->
 part1 -> part1 + part2 + ".txt" -> filename -> return ->
 intermediate_variable() -> custom_open arg
You must be logged in to vote
1 reply
Comment options

Hi @redsun82 thanks for your input

I understand what you say, but the problem is that I don't only want to track the user input coming from sys.argv . In your example I would end up having sys.argv[1] as the source whereas I would like to obtain part1 + part2 + ".txt" cause that's the actual path that is being accessed .

Second, the input flowing into custom_open(arg) might come from a source that is a concatenation of strings that don't necessarily depend on sys.argv (e.g., they might be concatenated to hardcoded strings or other functions' return values) -- for my project I would like to have all those "concatenated" strings, that's why I tried to use 'Expression' as a source in the attempt to capture all the possible sources

Comment options

Try adding this to FilesystemTaint:

 predicate isBarrierIn(Node node) { isSource(node) }

What this means is that source nodes also block any taint from flowing into them. So if you used to have a path "source1 -> source2 -> source3 -> sink", with this change it will only give you "source3 -> sink".

You must be logged in to vote
2 replies
Comment options

Another option is to use DataFlow::Global instead of TaintTracking::Global. Then you will only get expressions that actually flow to the sink, without being modified in some way first.

Answer selected by elManto
Comment options

Thanks moving to DataFlow::Global did most of the job :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet

AltStyle によって変換されたページ (->オリジナル) /