-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #133 from AikidoSec/AIK-3527
Add support for psycopg
- Loading branch information
Showing
6 changed files
with
218 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
""" | ||
Sink module for `psycopg` | ||
""" | ||
|
||
import copy | ||
import importhook | ||
from aikido_firewall.vulnerabilities.sql_injection.dialects import Postgres | ||
from aikido_firewall.background_process.packages import add_wrapped_package | ||
import aikido_firewall.vulnerabilities as vulns | ||
|
||
|
||
@importhook.on_import("psycopg.cursor") | ||
def on_psycopg_import(psycopg): | ||
""" | ||
Hook 'n wrap on `psycopg.connect` function, we modify the cursor_factory | ||
of the result of this connect function. | ||
""" | ||
modified_psycopg = importhook.copy_module(psycopg) | ||
former_copy_funtcion = copy.deepcopy(psycopg.Cursor.copy) | ||
former_execute_function = copy.deepcopy(psycopg.Cursor.execute) | ||
former_executemany_function = copy.deepcopy(psycopg.Cursor.executemany) | ||
|
||
def aikido_copy(self, statement, params=None, *args, **kwargs): | ||
sql = statement | ||
vulns.run_vulnerability_scan( | ||
kind="sql_injection", op="psycopg.Cursor.copy", args=(sql, Postgres()) | ||
) | ||
return former_copy_funtcion(self, statement, params, *args, **kwargs) | ||
|
||
def aikido_execute(self, query, params=None, *args, **kwargs): | ||
sql = query | ||
vulns.run_vulnerability_scan( | ||
kind="sql_injection", op="psycopg.Cursor.execute", args=(sql, Postgres()) | ||
) | ||
return former_execute_function(self, query, params, *args, **kwargs) | ||
|
||
def aikido_executemany(self, query, params_seq): | ||
args = (query, Postgres()) | ||
op = "psycopg.Cursor.executemany" | ||
vulns.run_vulnerability_scan(kind="sql_injection", op=op, args=args) | ||
return former_executemany_function(self, query, params_seq) | ||
|
||
setattr(psycopg.Cursor, "copy", aikido_copy) # pylint: disable=no-member | ||
setattr(psycopg.Cursor, "execute", aikido_execute) # pylint: disable=no-member | ||
# pylint: disable=no-member | ||
setattr(psycopg.Cursor, "executemany", aikido_executemany) | ||
|
||
add_wrapped_package("psycopg") | ||
return modified_psycopg |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
import pytest | ||
from unittest.mock import patch | ||
import aikido_firewall.sinks.psycopg | ||
from aikido_firewall.background_process.comms import reset_comms | ||
from aikido_firewall.vulnerabilities.sql_injection.dialects import Postgres | ||
|
||
|
||
@pytest.fixture | ||
def database_conn(): | ||
import psycopg | ||
|
||
return psycopg.connect( | ||
host="127.0.0.1", user="user", password="password", dbname="db" | ||
) | ||
|
||
|
||
def test_cursor_execute(database_conn): | ||
reset_comms() | ||
with patch( | ||
"aikido_firewall.vulnerabilities.run_vulnerability_scan" | ||
) as mock_run_vulnerability_scan: | ||
cursor = database_conn.cursor() | ||
query = "SELECT * FROM dogs" | ||
cursor.execute(query) | ||
|
||
called_with = mock_run_vulnerability_scan.call_args[1] | ||
assert called_with["args"][0] == query | ||
assert isinstance(called_with["args"][1], Postgres) | ||
assert called_with["op"] == "psycopg.Cursor.execute" | ||
assert called_with["kind"] == "sql_injection" | ||
mock_run_vulnerability_scan.assert_called_once() | ||
|
||
cursor.fetchall() | ||
database_conn.close() | ||
mock_run_vulnerability_scan.assert_called_once() | ||
|
||
|
||
def test_cursor_execute_parameterized(database_conn): | ||
reset_comms() | ||
cursor = database_conn.cursor() | ||
query = "SELECT * FROM dogs WHERE dog_name = %s" | ||
params = ("Fido",) | ||
with patch( | ||
"aikido_firewall.vulnerabilities.run_vulnerability_scan" | ||
) as mock_run_vulnerability_scan: | ||
cursor.execute(query, params) | ||
|
||
called_with = mock_run_vulnerability_scan.call_args[1] | ||
assert called_with["args"][0] == "SELECT * FROM dogs WHERE dog_name = %s" | ||
assert isinstance(called_with["args"][1], Postgres) | ||
assert called_with["op"] == "psycopg.Cursor.execute" | ||
assert called_with["kind"] == "sql_injection" | ||
mock_run_vulnerability_scan.assert_called_once() | ||
|
||
cursor.fetchall() | ||
database_conn.close() | ||
|
||
|
||
def test_cursor_executemany(database_conn): | ||
reset_comms() | ||
cursor = database_conn.cursor() | ||
query = "INSERT INTO dogs (dog_name, isadmin) VALUES (%s, %s)" | ||
params = [("doggo1", False), ("Rex", False), ("Buddy", True)] | ||
|
||
with patch( | ||
"aikido_firewall.vulnerabilities.run_vulnerability_scan" | ||
) as mock_run_vulnerability_scan: | ||
cursor.executemany(query, params) | ||
|
||
# Check the last call to run_vulnerability_scan | ||
called_with = mock_run_vulnerability_scan.call_args[1] | ||
assert ( | ||
called_with["args"][0] | ||
== "INSERT INTO dogs (dog_name, isadmin) VALUES (%s, %s)" | ||
) | ||
assert isinstance(called_with["args"][1], Postgres) | ||
assert called_with["op"] == "psycopg.Cursor.executemany" | ||
assert called_with["kind"] == "sql_injection" | ||
mock_run_vulnerability_scan.assert_called() | ||
|
||
database_conn.commit() | ||
database_conn.close() | ||
|
||
|
||
def test_cursor_copy(database_conn): | ||
reset_comms() | ||
cursor = database_conn.cursor() | ||
query = "COPY dogs FROM STDIN" | ||
|
||
with patch( | ||
"aikido_firewall.vulnerabilities.run_vulnerability_scan" | ||
) as mock_run_vulnerability_scan: | ||
cursor.copy(query) | ||
|
||
called_with = mock_run_vulnerability_scan.call_args[1] | ||
assert called_with["args"][0] == query | ||
assert isinstance(called_with["args"][1], Postgres) | ||
assert called_with["op"] == "psycopg.Cursor.copy" | ||
assert called_with["kind"] == "sql_injection" | ||
mock_run_vulnerability_scan.assert_called_once() | ||
|
||
database_conn.close() |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters