image-20210321215913936

NGF

实验 - 编程实现带参数执行模式防SQL注入


- 功能 -

  • 通过数据库访问的 prepare / bindparam / exec 模式访问数据库,防止通过将用户输入拼接到SQL语句执行可能导致的SQL注入。

- 要求 -

  • 可选用任意数据库和任意编程语言;
  • 能够演示拼接SQL语句导致SQL注入的情况;
  • 通过带参执行反SQL注入。

- 编程配置 -

  • 语言:PHP PYTHON GOLANG
  • 数据库:MYSQLv5.7.28
  • 系统:WIN10.0.19041

- 数据库测试结构 -

  • 数据库:hg
  • 表名:userinfo
  • 字段:userID userName userEmail userPass userSalt userLoginIP userLoginTime

其中,表的结构如下:

desc userinfo;
+---------------+--------------+------+-----+---------+----------------+
| Field         | Type         | Null | Key | Default | Extra          |
+---------------+--------------+------+-----+---------+----------------+
| userID        | int(11)      | NO   | PRI | NULL    | auto_increment |
| userName      | varchar(64)  | NO   |     | NULL    |                |
| userEmail     | varchar(128) | NO   |     | NULL    |                |
| userPass      | varchar(64)  | NO   |     | NULL    |                |
| userSalt      | varchar(32)  | NO   |     | NULL    |                |
| userLoginIP   | varchar(32)  | NO   |     | NULL    |                |
| userLoginTime | datetime     | NO   |     | NULL    |                |
+---------------+--------------+------+-----+---------+----------------+

- 实现 -

这里分别使用 PHP PYTHON GOLANG 去实现 prepare exec 以及 bindparam 模式,同时给出 绝对安全存在注入 的比较。

- PHP 实现 prepare 模式 -

首先,这里的主要危险点暂且定为关于数据库 INSERT 数据的注入。那么简单的做一个 用户注册 的前端,如图:

image-20210321163012138

此时大致处理逻辑为:

  • -> 获取用户传入注册信息
  • -> 检查邮箱和/用户名/个人昵称是否重复
  • -> 将hash加密的密码以及其他信息 INSERT 到数据库中

而关于检查是否重复这一部分,会涉及到 SELECT 查询,由于重点暂且为 INSERT 部分,有关 SELECT 就先定为 绝对安全 吧。然后会在 INSERT 布设 绝对安全存在注入 的对比。

- 代码实现 -

这个是关于配置的文件,文件名为 ./lib/config.php

<?php
class Config
{
	static public	$APP_SQL_HOST = '127.0.0.1';
	static public	$APP_SQL_PORT = 3306;
	static public	$APP_SQL_USER = 'root';
	static public	$APP_SQL_PASS = 'root';
	static public	$APP_SQL_DATABASE = 'hg';
	static public	$APP_SQL_TABLE_PRE = '';
}
?>

然后是关于数据库调用包装的文件,文件名为 ./lib/sql.php

<?php

require_once "./config.php";

# SQL -> MYSQL调用
class SQL
{
	static public $SQL_OBJ;
	function __construct()
	{
		self::$SQL_OBJ = new PDO("mysql:host=".Config::$APP_SQL_HOST.";port=".Config::$APP_SQL_PORT.";dbname=".Config::$APP_SQL_DATABASE.";charset=utf8", Config::$APP_SQL_USER, Config::$APP_SQL_PASS);
		if (!self::$SQL_OBJ) {
			self::$SQL_OBJ = null;
		}
		return $this->check();
	}

	# 检查数据库是否成功连接
	static function check()
	{
		if (is_null(self::$SQL_OBJ)) {
			die("Can't not connect SQL~");
		}
		return true;
	}

	# 查询格式
	static function queryFormat(array $array, String $pad = 'AND', $new_array = [])
	{
		foreach ($array as $key) {
			array_push($new_array, "`{$key}` = ? ");
		}
		return join(" {$pad} ", $new_array);
	}

	# 插入格式(安全)
	static function insertValueFormat(array $array)
	{

		return '(' . join(',', array_map(function ($v) {
			return ' ? ';
		}, $array)) . ')';
	}
	
	# 插入格式(注入)
	static function insertValueFormatInject(array $array)
	{

		return '(' . join(',', array_map(function ($v) {
			return "'{$v}'";
		}, $array)) . ')';
	}
	
	static function insertColumnFormat(array $array){
		# 安全
		foreach($array as $v){
			if(stripos($v,'`') !== false){
				return '';
			}
		}
		return '(`' . join('`,`',$array) . '`)';
	}
	

	# 查询方法(安全)
	static function query(String $table, array $conditions, String $extraString = '', $re = [])
	{
		if (self::check()) {
			try {
				if(count($conditions) > 0){
					$q = "SELECT * FROM `" . Config::$APP_SQL_TABLE_PRE . $table . "` WHERE " . self::queryFormat(array_keys($conditions))  . $extraString;
					$r = self::$SQL_OBJ->prepare($q);
					$r->execute(array_values($conditions));
					if ($g = $r->fetchall(PDO::FETCH_ASSOC)) {
						foreach ($g as $row) {
							array_push($re, $row);
						}
						return $re;
					}
				}else{
					
					$q = "SELECT * FROM `" . Config::$APP_SQL_TABLE_PRE . $table . '`' . $extraString;
					$r = self::$SQL_OBJ->query($q);
					
					foreach($r->fetchall(PDO::FETCH_ASSOC) as $row) {
						array_push($re, $row);
					}
					
					return $re;
					
				}
			} catch (Exception $_) {
				die('ERROR -> ' . $_);
			}
		}
	}
	# 插入方法(安全)
	static function insert(String $table, array $insert, String $extraString = '')
	{
		if (self::check()) {
			try {
				$q = "INSERT INTO `" . Config::$APP_SQL_TABLE_PRE . $table . "`" . self::insertColumnFormat(array_keys($insert)) . " VALUES " . self::insertValueFormat(array_values($insert))  . $extraString;
				$r = self::$SQL_OBJ->prepare($q);
				return $affected_rows = $r->execute(array_values($insert));
				
			} catch (Exception $_) {
				die('ERROR -> ' . $_);
			}
		}
	}
	
	# 插入方法(注入)
	static function insertInject(String $table, array $insert, String $extraString = '')
	{
		if (self::check()) {
			try {
				$q = "INSERT INTO `" . Config::$APP_SQL_TABLE_PRE . $table . "`" . self::insertColumnFormat(array_keys($insert)) . " VALUES " . self::insertValueFormatInject(array_values($insert))  . $extraString;
				$r = self::$SQL_OBJ->prepare($q);
				return $affected_rows = $r->execute();
			
			} catch (Exception $_) {
				die('ERROR -> ' . $_);
			}
		}
	}
}

至于查询部分的代码就先略过了。

- 注入分析 -

我们观察一下表和代码的 INSERT 语句,不难看出将会有 6 个字段被 INSERT, 其中语句为:

INSERT INTO `userinfo`(`userName`,`userEmail`,`userPass`,`userSalt`,`userLoginIP`,`userLoginTime`) VALUES ('[用户名]','[邮箱]','[密码hash]','[盐值]','[IP]','[最后登录时间]')

那么这里可以从 用户名 这里进行 2 种攻击方式的 注入攻击 ,分别是:

  • 基于时间/布尔盲注

其中,基于时间的盲注 payload 可以为如下:

# 猜解数据库用户名
123',if((select((mid(lpad(bin(ord(mid(user(),1,1))),7,0x30),1,1)))),sleep(1),[脏数据]),[脏数据],[脏数据],[脏数据],[脏数据]);#

实际上被数据库执行的为如下:

INSERT INTO `userinfo`(`userName`,`userEmail`,`userPass`,`userSalt`,`userLoginIP`,`userLoginTime`) VALUES ('123',if((select((mid(lpad(bin(ord(mid(user(),1,1))),7,0x30),1,1)))),sleep(1),[脏数据]),[脏数据],[脏数据],[脏数据],[脏数据]);#','[邮箱]','[密码hash]','[盐值]','[IP]','[最后登录时间]')

显然,注释符已经将后边的内容给去掉了,形成了一个合法的 INSERT 语句。并且由于这里使用了 [脏数据] ,也就是永远不会 注册成功 ,而当条件成立时也会触发 **sleep(1)**,那么就不会有很大的动静。

而基于布尔的盲注相较于基于时间的盲注是比较鸡肋的,大致的 payload 可以为如下:

# 才接数据库用户名
123',if((select((mid(lpad(bin(ord(mid(user(),1,1))),7,0x30),1,1)))),[脏数据],[合法数据]),[合法数据],[合法数据],[合法数据],[合法数据]);#

被数据库执行的为如下:

INSERT INTO `userinfo`(`userName`,`userEmail`,`userPass`,`userSalt`,`userLoginIP`,`userLoginTime`) VALUES ('123',if((select((mid(lpad(bin(ord(mid(user(),1,1))),7,0x30),1,1)))),[脏数据],[合法数据]),[合法数据],[合法数据],[合法数据],[合法数据]);#','[邮箱]','[密码hash]','[盐值]','[IP]','[最后登录时间]')

简单来说,即是当条件成立时就会将 [脏数据]INSERT 到字段中,此时 INSERT 操作是不能成功的,即会显示 注册失败 ;反之,如若条件不成立,那么将会把 [合法数据]INSERT 到字段中,此时 INSERT 操作是成功的,会显示 注册成功 。(鸡肋就在于一旦条件成立就会多添加一个用户,会有很大动静)

  • 增添多个用户

这个就比较简单,涉及到了关于 mysql 数据库中语句的连通性,比如可以构造一个 payload 如下:

# INSERT多个用户
123',[合法数据],[合法数据],[合法数据],[合法数据],[合法数据]),([第二个用户],[合法数据],[合法数据],[合法数据],[合法数据],[合法数据]),([第三个用户],[合法数据],[合法数据],[合法数据],[合法数据],[合法数据]);#

那么,被执行的结果如下:

INSERT INTO `userinfo`(`userName`,`userEmail`,`userPass`,`userSalt`,`userLoginIP`,`userLoginTime`) VALUES ('123',[合法数据],[合法数据],[合法数据],[合法数据],[合法数据]),([第二个用户],[合法数据],[合法数据],[合法数据],[合法数据],[合法数据]),([第三个用户],[合法数据],[合法数据],[合法数据],[合法数据],[合法数据]);#','[邮箱]','[密码hash]','[盐值]','[IP]','[最后登录时间]')

该注释的注释了,也就相当于一个合法的 INSERT 语句。

- 绝对安全 -

这里先来看一下关键代码,

# 插入格式(安全)
static function insertValueFormat(array $array)
{

	return '(' . join(',', array_map(function ($v) {
		return ' ? ';
	}, $array)) . ')';
}
static function insertColumnFormat(array $array){
	# 安全
	foreach($array as $v){
		if(stripos($v,'`') !== false){
			return '';
		}
	}
	return '(`' . join('`,`',$array) . '`)';
}

# 插入方法(安全)
static function insert(String $table, array $insert, String $extraString = '')
{
	if (self::check()) {
		try {
			$q = "INSERT INTO `" . Config::$APP_SQL_TABLE_PRE . $table . "`" . self::insertColumnFormat(array_keys($insert)) . " VALUES " . self::insertValueFormat(array_values($insert))  . $extraString;
			$r = self::$SQL_OBJ->prepare($q);
			return $affected_rows = $r->execute(array_values($insert));
			
		} catch (Exception $_) {
			die('ERROR -> ' . $_);
		}
	}
}

显然,这里使用了 prepare 模式,其中 预拼接的语句 为:

INSERT INTO `userinfo`(`userName`,`userEmail`,`userPass`,`userSalt`,`userLoginIP`,`userLoginTime`) VALUES ( ? , ? , ? , ? , ? , ? )

然后在下边的代码:

$r = self::$SQL_OBJ->prepare($q);
$r->execute(array_values($insert));

对上边的 预拼接的语句 当作 数据库语句 进行 预处理 后再将传入的参数填补进去了,此时用户输入的参数也就不会直接被当作 数据库语句 去执行了,那么注入也就可以被避免。

这里可以做一个即为简单的演示,比如使用时间盲注:

123',if(1,sleep(5),0),1,2,3,4);#

瞬间回显,无等待:

image-20210321171811339

再看一下数据库的内容:

image-20210321171905207

显然成功的被放进去了,也就是说是安全的了。

- 存在注入 -

接下来到存在注入了,先看一下关键代码:

# 插入格式(注入)
static function insertValueFormatInject(array $array)
{

	return '(' . join(',', array_map(function ($v) {
		return "'{$v}'";
	}, $array)) . ')';
}
static function insertColumnFormat(array $array){
	# 安全
	foreach($array as $v){
		if(stripos($v,'`') !== false){
			return '';
		}
	}
	return '(`' . join('`,`',$array) . '`)';
}
# 插入方法(注入)
static function insertInject(String $table, array $insert, String $extraString = '')
{
	if (self::check()) {
		try {
			$q = "INSERT INTO `" . Config::$APP_SQL_TABLE_PRE . $table . "`" . self::insertColumnFormat(array_keys($insert)) . " VALUES " . self::insertValueFormatInject(array_values($insert))  . $extraString;
			$r = self::$SQL_OBJ->prepare($q);
			return $affected_rows = $r->execute();
		
		} catch (Exception $_) {
			die('ERROR -> ' . $_);
		}
	}
}

这里虽然说使用了 prepare 进行预处理语句,然而 预拼接的语句 直接为:

INSERT INTO `userinfo`(`userName`,`userEmail`,`userPass`,`userSalt`,`userLoginIP`,`userLoginTime`) VALUES ([合法数据],[合法数据],[合法数据],[合法数据],[合法数据],[合法数据])

由于 预拼接的语句 会被当作 数据库语句 进行 预处理 ,而那么 预拼接的语句 中存在对用户输入参数的直接拼接,此时也就会存在注入了。

比如做一个简单演示,增添多个用户:

123','a@a.com','1','2','3','2000-01-01 00:00:00'),('321','a@aa,com','1','2','3','2000-01-01 00:00:00');#

这里是显示注册成功了:

image-20210321172838419

那么查看一下数据库:

image-20210321172856555

可以看到有多个用户被添加了,即成功进行了注入攻击。

- PYTHON 实现 exec 模式 -

那么,这里的主要危险点暂且定为关于数据库 SELECT 数据的注入,也就是在用户登录时的注入。可以简单做一个 用户登录 的前端:

image-20210321173150360

此时大致处理逻辑为:

  • -> 获取用户传入登录信息
  • -> 检查验证码、csrftoken等是否正确
  • -> 检查用户名/邮箱是否存在,且密码正确
  • -> 更新用户最后登录IP以及登录时间

显然于检查是用户名/邮箱以及密码这一块,会涉及到 SELECT 查询,而在后边的更新操作会涉及到 UPDATE ,由于重点暂且为 SELECT 部分,有关 UPDATE 就先定为 绝对安全 吧。然后会在 UPDATE 布设 绝对安全存在注入 的对比。

- 代码实现 -

这是配置文件,文件名为 ./globals/constant.py

""" --- 数据库 --- """
SQL_CONFIG = {
    "host":"localhost",
    "user":"root",
    "passwd":"root",
    "port":3306,
    "charset":"utf8"
}
SQL_DATABASE = "hg"
SQL_TABLE_PRE = ""
SQL_TABLES={
    "adminLogin" : SQL_TABLE_PRE + "userinfo",
}

关于数据库调用的包装,文件名为 ./module/SqlControl.py

import pymysql


class SQLConnect(object):
    """
        Todo: SQLConnect
        ---
        @host -> str `default:localhost`
        @user -> str `default:root`
        @passwd -> str `default:`
        @port -> int `default:3306`
        @charset -> str `default:utf8`
        ---

        @Return ||Dict||
        @

    """

    defaultDict = {
        "host": "localhost",
        "user": "root",
        "passwd": "",
        "port": 3306,
        "charset": "utf8"
    }

    db = None
    point = None
    isConnect = False

    def __init__(self, **interfaces):
        if interfaces is not None:
            for k, v in interfaces.items():
                self.defaultDict[k] = v if self.defaultDict[k] != v else self.defaultDict[k]

    def connect(self) -> bool:
        """连接数据库"""
        try:
            self.db = pymysql.connect(host=self.defaultDict["host"], user=self.defaultDict["user"],
                                      password=self.defaultDict["passwd"], charset=self.defaultDict["charset"],
                                      port=self.defaultDict["port"])
            self.point = self.db.cursor()
            self.isConnect = True
        except:
            return False
        return True

    def disconnect(self) -> bool:
        """关闭数据库连接"""
        if self.isConnect:
            try:
                self.db.close()
                self.point.close()
                self.isConnect = False
            except:
                return False
            return True
        else:
            return False

    @staticmethod
    def __insertColumnFormat(keys: list) -> str:
        return "(`" + "`,`".join(keys) + "`)"

    @staticmethod
    def __insertValueFormat(values: list) -> str:
        return "(" + ",".join(['%s' for i in range(len(values))]) + ")"

    @staticmethod
    def __queryColumn(keys:list) -> str:
        return "`" + "`,`".join(keys) + "`"

    @staticmethod
    def __queryForm(keys: list, pad: str = "AND") -> str:
        return f" {pad} ".join([f"`{each}` = %s" for each in keys])

    @staticmethod
    def __queryFormInject(data: dict, pad: str = "AND") -> str:
        return f" {pad} ".join([f"`{key}` = '{value}'" for key,value in data.items()])

    """
        Function: select_db
        ---
        @name -> str `default:`
        ---
        
        @Return ||Bool||
    """
    def select_db(self, name: str) -> bool:
        """选择数据库"""
        if self.isConnect:
            self.db.select_db(name)
            return True
        else:
            return False

    """
        Function: query
        ---
        @table -> str `default:`
        @columns -> list `default:`
        @conditions -> dict `default:`
        @extraString -> str `default:`
        ---
        
        @Return ||Tuple||
        @
    """
    def query(self,table: str,columns:list,conditions:dict,extraString:str,pad:str="AND") -> dict:
        """数据查询"""
        if self.isConnect:
            col = self.__queryColumn(columns) if len(columns) > 0 else "*"
            q = f"SELECT {col} FROM `{table}`" + " WHERE " + self.__queryForm(list(conditions.keys()),pad = pad) + extraString if len(conditions) > 0 else f"SELECT {col} FROM `{table}`" + extraString
            self.point.execute(query = q,args=tuple(conditions.values()))
            self.db.commit()
            return self.point.fetchall()
        else:
            return {}

    """
            Function: queryInject
            ---
            @table -> str `default:`
            @columns -> list `default:`
            @conditions -> dict `default:`
            @extraString -> str `default:`
            ---

            @Return ||Tuple||
            @
        """

    def queryInject(self, table: str, columns: list, conditions: dict, extraString: str, pad: str = "AND") -> dict:
        """数据查询"""
        if self.isConnect:
            col = self.__queryColumn(columns) if len(columns) > 0 else "*"
            q = f"SELECT {col} FROM `{table}`" + " WHERE " + self.__queryFormInject(conditions,pad=pad) + extraString if len(conditions) > 0 else f"SELECT {col} FROM `{table}`" + extraString
            self.point.execute(query=q)
            self.db.commit()
            return self.point.fetchall()
        else:
            return {}

    """
        Function: update
        ---
        @table -> str `default:`
        @update -> dict `default:`
        @condition -> dict `default:`
        @extraString -> str `default:`
        ---

        @Return ||Int||
        @
    """
    def update(self,table:str, update:dict,conditions:dict,extraString:str,pad:str="AND") -> int:
        """数据更新"""
        if self.isConnect:
            q = f"UPDATE `{table}` SET " + self.__queryForm(list(update.keys()),pad = ",") + " WHERE " + self.__queryForm(list(conditions.keys()),pad = pad) + extraString if len(conditions) > 0 else f"UPDATE `{table}` SET " + self.__queryForm(list(update.keys()),pad = ",") + extraString
            r = self.point.execute(query = q,args = tuple(list(update.values()) + list(conditions.values())))
            self.db.commit()
            return r
        else:
            return -1
        

有关查询部分的代码就先略过。

- 注入分析 -

这里来简单观察一下有关表和代码的 SELECT 语句,不难看出大致语句为:

# 先判断用户名/邮箱是否正确
SELECT * FROM `userinfo` WHERE `userName` = '[用户名]' OR `userEmail` = '[邮箱]'
# 若能选择得到,则再判断选择到的密码hash和输入的密码的hash是否一致

由于这是有关 用户登录 界面的注入,且再加上判断逻辑,可以从 用户名 这里进行 1 种攻击方式的 注入攻击

  • 基于时间盲注

毕竟基于布尔的盲注是不太可能的,因为现在普遍无论用户名还是密码错误都只会返回同样的错误提示,就没有区分点来进行布尔判断了。

以下是一个很简单 payload 进行时间盲注:

# 比如当形如substr被过滤时可以用left+right构成逐个字符盲注猜解
123'^if((select(right((left(lpad(bin(ord(right((left(user(),1)),1))),7,'0'),1)),1))),sleep(1),1)#

实际上被数据库执行的为如下:

SELECT * FROM `userinfo` WHERE (`userName` = '123'^if((select(right((left(lpad(bin(ord(right((left(user(),1)),1))),7,'0'),1)),1))),sleep(1),1)#' OR `userEmail` = '[邮箱]')

显然后边的语句全被注释了,那么也就能够执行咱们的盲注猜解语句了。

- 绝对安全 -

还是先来看一下关键代码:

   @staticmethod
   def __queryForm(keys: list, pad: str = "AND") -> str:
       return f" {pad} ".join([f"`{each}` = %s" for each in keys])
@staticmethod
   def __queryColumn(keys:list) -> str:
       return "`" + "`,`".join(keys) + "`"
def query(self,table: str,columns:list,conditions:dict,extraString:str,pad:str="AND") -> dict:
       """数据查询"""
       if self.isConnect:
           col = self.__queryColumn(columns) if len(columns) > 0 else "*"
           q = f"SELECT {col} FROM `{table}`" + " WHERE " + self.__queryForm(list(conditions.keys()),pad = pad) + extraString if len(conditions) > 0 else f"SELECT {col} FROM `{table}`" + extraString
           print(q)
           self.point.execute(query = q,args=tuple(conditions.values()))
           self.db.commit()
           return self.point.fetchall()
       else:
           return {}

不难看出,这里使用了 exec 模式,简单来说即是执行一个较为完整的 存储过程 ,其中 预存储的语句 为:

SELECT * FROM `userinfo` WHERE `userEmail` = %s OR `userName` = %s 

在下边的代码:

self.point.execute(query = q,args=tuple(conditions.values()))
self.db.commit()

还是会对将上边的 预存储的语句 当作 数据库语句 进行 预处理 后再将传入的参数填补进去了,此时用户输入的参数也就不会直接被当作 数据库语句 去执行了,那么注入也就可以被避免。

比如做一个很简单的时间盲注的演示:

morouu'^if(1,sleep(10),0);#

OK,用户名并没有被逃逸:

image-20210321190137784

显然是没有成功进行注入攻击了。

- 存在注入 -

关键的代码:

   @staticmethod
   def __queryFormInject(data: dict, pad: str = "AND") -> str:
       return f" {pad} ".join([f"`{key}` = '{value}'" for key,value in data.items()])
@staticmethod
   def __queryColumn(keys:list) -> str:
       return "`" + "`,`".join(keys) + "`"
   def queryInject(self, table: str, columns: list, conditions: dict, extraString: str, pad: str = "AND") -> dict:
       """数据查询"""
       if self.isConnect:
           col = self.__queryColumn(columns) if len(columns) > 0 else "*"
           q = f"SELECT {col} FROM `{table}`" + " WHERE " + self.__queryFormInject(conditions,pad=pad) + extraString if len(conditions) > 0 else f"SELECT {col} FROM `{table}`" + extraString
           self.point.execute(query=q)
           self.db.commit()
           return self.point.fetchall()
       else:
           return {}

还是老问题,虽然说是使用了 预存储过程 , 然而 预存储语句 是存在用户输入内容的,实际上就相当于:

SELECT * FROM `userinfo` WHERE `userName` = [合法数据] OR `userEmail` = [合法数据]

由于用户输入的内容会在 预存储过程 中被当作 数据库语句 进行处理,也就有可能发生注入了。

比如一个简单的盲注,同时用错误的密码:

morouu'^if(1,sleep(5),0);#

这里是页面卡住了,一直卡着不止 5 秒,这说明 预存储过程 出错了。

image-20210321191517365

如果改成以下语句,并且用错误密码去登录:

morouu'^if(1,0,sleep(5));#

此时可以得到一个 用户名/密码错误 的提示,也就可以用 是否卡住用户/密码错误 提示来进行盲注了:

image-20210321193243998

而如果此时保证输入正确的密码,并使用以下语句:

morouu'and(1);#

并且输入该用户正确的密码 123,此时 预存储语句 为:

SELECT * FROM `userinfo` WHERE `userName` = 'morouu'and(1);#' OR `userEmail` = 'morouu'and(1);#'

实际上处理的语句即为:

SELECT * FROM `userinfo` WHERE `userName` = 'morouu'and(1)

那么是可以登录成功的:

image-20210321192811076

显然语句是被成功注入进去了,也就是成功进行注入攻击。

- GOLANG 实现 bindparam 模式 -

上边的已经有关 用户注册用户登录 了,这里的主要危险点也暂且定为关于数据库 SELECT 数据的注入,也就是在查询用户信息时的注入。可以简单做一个 用户查询 的前端:

image-20210321193611305

此时大致处理逻辑为:

  • -> 获取用户传入查询信息
  • -> 检查uuid等是否正确
  • -> 检查用户ID是否存在并返回查询结果流

那么检查用户ID是否存在也就涉及到 SELECT 查询,这里会在 SELECT 布设 绝对安全存在注入 的对比。

- 代码实现 -

由于 GOLANG 属于强类型语言,直接传参做一个简单的demo好了。

配置的 dbConfig 包,路径 github.com/morouu/config

database.go 文件:

package dbConfig

var DBConfig = map[string]string{
	"host":     "localhost",
	"user":     "root",
	"pass":     "root",
	"database": "hg",
	"port":     "3306",
	"charset":  "utf8",
}

var TableConfig = map[string]map[string]string{
	"queryTable": {
		"prefix":    "",
		"tableName": "userinfo",
	},
}

实现对数据库调用包装的 sqllib 包,路径 github,com/morouu/lib/sql

connect.go 文件:

package sqllib

import (
	"database/sql"
	"log"
	"strings"

	_ "github.com/go-sql-driver/mysql"
)

const dbType string = "mysql"

func Connect(interfaces map[string]string) (*sql.DB, error) {

	params := []string{"host", "user", "pass", "port", "database", "charset"}

	for k, _ := range interfaces {

		if !isValueList(k, params) {

			log.Fatalf("params error -> %s", k)
		}
	}
	dbInfo := strings.Join([]string{interfaces["user"], ":", interfaces["pass"], "@tcp(", interfaces["host"], ":", interfaces["port"], ")/", interfaces["database"], "?charset=", interfaces["charset"]}, "")
	dbIns, err := sql.Open(dbType, dbInfo)

	if err != nil {
		return nil, err
	}

	dbIns.SetConnMaxLifetime(1000)
	dbIns.SetMaxIdleConns(100)

	if err = dbIns.Ping(); err != nil {
		return nil, err
	}

	log.Println("<<< Connect success! >>>")

	return dbIns, nil

}

func.go 文件:

package sqllib

import "strings"

func isValueList(value string, list []string) bool {

	for _, v := range list {
		if v == value {
			return true
		}
	}
	return false
}

func sqlColConnect(col []string) (result string) {

	if len(col) == 1 && col[0] == "*" {
		return "*"
	}
	for k, s := range col {
		col[k] = "`" + s + "`"
	}
	result = strings.Join(col, ",")
	return
}

func sqlConditionConnect(condition map[string]interface{}, pad string) (result string) {

	conStr := make([]string, 0, len(condition))
	if pad == "" {
		pad = " AND "
	}
	for k, _ := range condition {
		conStr = append(conStr, k+" = ?")
	}
	result = "(" + strings.Join(conStr, pad) + ")"
	return
}
func sqlConditionConnectInject(condition map[string]interface{}, pad string) (result string) {

	conStr := make([]string, 0, len(condition))
	if pad == "" {
		pad = " AND "
	}
	for k, v := range condition {
		conStr = append(conStr, k+" = '"+v.(string)+"'")
	}
	result = "(" + strings.Join(conStr, pad) + ")"
	return
}

query.go 文件:

package sqllib

import (
	"database/sql"
	"log"
	"github.com/morouu/config"
)

func QuerySafe(db *sql.DB, table string, prefix string, col []string, condition map[string]interface{}, extraString string) ([]tableInterface, error) {

	var sqlStr string = "SELECT " + sqlColConnect(col) + " from `" + prefix + table + "` WHERE " + sqlConditionConnect(condition, " AND ") + extraString

	var params []interface{} = make([]interface{}, 0, len(condition))
	var tableData = make([]tableInterface, 0)
	for _, v := range condition {
		params = append(params, v)
	}
	// 使用参数化查询即可避免SQL注入
	rows, err := db.Query(sqlStr, params...)
	if err != nil {
		log.Panicln(err)
	}
	if table == dbConfig.TableConfig["queryTable"]["tableName"] {
		for rows.Next() {
			var userRow userinfo
			err := rows.Scan(&userRow.userID, &userRow.userName, &userRow.userEmail, &userRow.userPass, &userRow.userSalt, &userRow.userLoginIP, &userRow.userLoginTime)
			if err != nil {
				log.Panicln(err)
			}
			tableData = append(tableData, userRow)
		}
	} else {
		return nil, nil
	}
	return tableData, nil

}

func QueryInject(db *sql.DB, table string, prefix string, col []string, condition map[string]interface{}, extraString string) ([]tableInterface, error) {
	// 如若直接手动拼接查询会存在SQL注入
	var sqlStr string = "SELECT " + sqlColConnect(col) + " FROM `" + prefix + table + "` WHERE " + sqlConditionConnectInject(condition, " AND ") + extraString
	var tableData = make([]tableInterface, 0)
	rows, err := db.Query(sqlStr)

	if err != nil {
		log.Panicln(err)
	}
	if table == dbConfig.TableConfig["queryTable"]["tableName"] {
		for rows.Next() {
			var userRow userinfo
			err := rows.Scan(&userRow.userID, &userRow.userName, &userRow.userEmail, &userRow.userPass, &userRow.userSalt, &userRow.userLoginIP, &userRow.userLoginTime)
			if err != nil {
				log.Panicln(err)
			}
			tableData = append(tableData, userRow)
		}

	} else {
		return nil, nil
	}
	return tableData, nil

}

tableStruct.go 文件:

package sqllib

import "strconv"

type tableInterface interface {
	GetStruct() map[string]string
}

type userinfo struct {
	userID        int64
	userName      string
	userEmail     string
	userPass      string
	userSalt      string
	userLoginIP   string
	userLoginTime string
}

func (s userinfo) GetStruct() (reMap map[string]string) {

	reMap = make(map[string]string, 6)
	reMap["userID"] = strconv.FormatInt(s.userID, 10)
	reMap["userName"] = s.userName
	reMap["userEmail"] = s.userEmail
	reMap["userPass"] = s.userPass
	reMap["userSalt"] = s.userSalt
	reMap["userLoginIP"] = s.userLoginIP
	reMap["userLoginTime"] = s.userLoginTime
	return reMap

}

其他的传参代码就先略过了。

- 注入分析 -

那么简单分析一下有关表和代码的 SELECT 语句,得到大致语句为:

SELECT * FROM `userinfo` WHERE ( userID = '[用户ID]' )

显然这时关于通过 用户ID 来查询用户信息的逻辑了,一般来说可以从 用户ID 使用 2 种攻击方式进行注入攻击:

  • 基于任意方式的盲注

这就比较简单了,比如基于时间盲注的 payload 可以如下:

-- 如果sleep被过滤,可以用笛卡尔积
123'^if(1,((select count(*) from information_schema.tables a,information_schema.tables b,information_schema.tables c)),0));#
-- 当然还可以用benchmark执行多次语句
123'^if(1,benchmark(7000000,md5('123')),0));#

而被数据库执行的为如下:

SELECT * FROM `userinfo` WHERE ( userID = '123'^if(1,((select count(*) from information_schema.tables a,information_schema.tables b,information_schema.tables c)),0));#' )

实际上也就是将用户的语句给嵌进去执行了,而注释符又将后边多余的影响语句合法性的内容给去掉了。

  • 联合查询的回显注入

联合查询实际也是依靠 union 来进行多记录查询,简单的 payload 可以如下:

-- 除了information_schema,还是可以用其他数据库的视图来查看当前数据库的所有表的
123')/**/union/**/select/**/1,user(),version(),(select/**/group_concat(table_name)/**/from/**/information_schema.tables/**/where/**/table_schema=database()),(select/**/group_concat(table_name)/**/from/**/sys.schema_table_statistics/**/where/**/table_schema=database()),(select/**/group_concat(object_name)/**/from/**/performance_schema.table_io_waits_summary_by_table/**/where/**/OBJECT_SCHEMA=database()),('1

被数据库执行的为如下:

SELECT * FROM `userinfo` WHERE ( userID = '123')/**/union/**/select/**/1,user(),version(),(select/**/group_concat(table_name)/**/from/**/information_schema.tables/**/where/**/table_schema=database()),(select/**/group_concat(table_name)/**/from/**/sys.schema_table_statistics/**/where/**/table_schema=database()),(select/**/group_concat(object_name)/**/from/**/performance_schema.table_io_waits_summary_by_table/**/where/**/OBJECT_SCHEMA=database()),('1')

此时就能够通过追加记录来查到其他内容了。

- 绝对安全 -

依然是关键代码:

func sqlColConnect(col []string) (result string) {

	if len(col) == 1 && col[0] == "*" {
		return "*"
	}
	for k, s := range col {
		col[k] = "`" + s + "`"
	}
	result = strings.Join(col, ",")
	return
}

func sqlConditionConnect(condition map[string]interface{}, pad string) (result string) {

	conStr := make([]string, 0, len(condition))
	if pad == "" {
		pad = " AND "
	}
	for k, _ := range condition {
		conStr = append(conStr, k+" = ?")
	}
	result = "(" + strings.Join(conStr, pad) + ")"
	return
}
func QuerySafe(db *sql.DB, table string, prefix string, col []string, condition map[string]interface{}, extraString string) ([]tableInterface, error) {

	var sqlStr string = "SELECT " + sqlColConnect(col) + " from `" + prefix + table + "` WHERE " + sqlConditionConnect(condition, " AND ") + extraString

	var params []interface{} = make([]interface{}, 0, len(condition))
	var tableData = make([]tableInterface, 0)
	for _, v := range condition {
		params = append(params, v)
	}
	// 使用参数化查询即可避免SQL注入
	rows, err := db.Query(sqlStr, params...)
	if err != nil {
		log.Panicln(err)
	}
	if table == dbConfig.TableConfig["queryTable"]["tableName"] {
		for rows.Next() {
			var userRow userinfo
			err := rows.Scan(&userRow.userID, &userRow.userName, &userRow.userEmail, &userRow.userPass, &userRow.userSalt, &userRow.userLoginIP, &userRow.userLoginTime)
			if err != nil {
				log.Panicln(err)
			}
			tableData = append(tableData, userRow)
		}
	} else {
		return nil, nil
	}
	return tableData, nil

}

这里实际上使用了 bindparam 模式,即是对传入的参数进行 参数绑定 ,简单来说即是告诉数据库,用户输入的内容是 查询的参数 不参与 数据库语句 的编译。

其中,数据库语句 为:

SELECT * from `userinfo` WHERE (userID = ?)

然后进行 参数绑定

var params []interface{} = make([]interface{}, 0, len(condition))
for _, v := range condition {
		params = append(params, v)
	}
rows, err := db.Query(sqlStr, params...)

由于用户输入的 查询的参数 被告知数据库这只是 查询参数 ,因而不会参与 数据库语句 的编译,那么也就无法从 查询的参数 这里逃逸从而形成注入攻击了。

比如输入以下的简单联合注入:

1')/**/union/**/select/**/1,user(),version(),database(),2,3,('1

这里还是返回了 userID1 的数据结果,实际上也就是把后边的 ')/**/union/**/select/**/1,user(),version(),database(),2,3,('1 内容给忽略了,没能成功逃逸出去:

image-20210321204735859

因为没能成功逃逸,注入攻击也就没能成功了。

- 存在注入 -

还是关键代码:

func sqlColConnect(col []string) (result string) {

	if len(col) == 1 && col[0] == "*" {
		return "*"
	}
	for k, s := range col {
		col[k] = "`" + s + "`"
	}
	result = strings.Join(col, ",")
	return
}
func sqlConditionConnectInject(condition map[string]interface{}, pad string) (result string) {

	conStr := make([]string, 0, len(condition))
	if pad == "" {
		pad = " AND "
	}
	for k, v := range condition {
		conStr = append(conStr, k+" = '"+v.(string)+"'")
	}
	result = "(" + strings.Join(conStr, pad) + ")"
	return
}
func QueryInject(db *sql.DB, table string, prefix string, col []string, condition map[string]interface{}, extraString string) ([]tableInterface, error) {
	// 如若直接手动拼接查询会存在SQL注入
	var sqlStr string = "SELECT " + sqlColConnect(col) + " FROM `" + prefix + table + "` WHERE " + sqlConditionConnectInject(condition, " AND ") + extraString
	var tableData = make([]tableInterface, 0)
	rows, err := db.Query(sqlStr)

	if err != nil {
		log.Panicln(err)
	}
	if table == dbConfig.TableConfig["queryTable"]["tableName"] {
		for rows.Next() {
			var userRow userinfo
			err := rows.Scan(&userRow.userID, &userRow.userName, &userRow.userEmail, &userRow.userPass, &userRow.userSalt, &userRow.userLoginIP, &userRow.userLoginTime)
			if err != nil {
				log.Panicln(err)
			}
			tableData = append(tableData, userRow)
		}

	} else {
		return nil, nil
	}
	return tableData, nil

}

这个是直接将用户传入的 查询的参数 直接拼接到了进行要编译的 数据库语句 里边了,并没有进行 参数绑定 ,是极有可能存在注入的。而语句即为:

SELECT * from `userinfo` WHERE (userID = [合法数据])

还是简单的联合注入查当前数据库存在的表名:

123')/**/union/**/select/**/1,user(),version(),(select/**/group_concat(table_name)/**/from/**/information_schema.tables/**/where/**/table_schema=database()),(select/**/group_concat(table_name)/**/from/**/sys.schema_table_statistics/**/where/**/table_schema=database()),(select/**/group_concat(object_name)/**/from/**/performance_schema.table_io_waits_summary_by_table/**/where/**/OBJECT_SCHEMA=database()),('1

显然是成功查询到表名了:

image-20210321205737305

那么也就说明该注入攻击是成功了。

- 总结 -

上边是分别通过使用 PHP PYTHON GOLANG 模拟一个简单的用户和数据库交互过程,其中也分别调用了 prepare exec bindparam 模式进行对比。不妨做一个简单的说明。

- prepare -

​ 先组一个 预拼接的语句 ,让后将其当作 数据库语句 进行 预处理 ,最后再将用户传入的参数填补进去了,此时用户输入的参数也就不会直接被当作 数据库语句 去执行了,那么注入也就可以被避免。

- exec -

​ 和 prepare 差不多的处理过程,不过是组一个 预存储的语句 ,后边和 prepare 的处理流程基本相同。只是exec 相较于 prepare 比较好的地方是,由于 exec 实现的是一个 预存储过程 ,在这个过程中如果存在有关时间的阻塞,例如 sleep() 之类的函数调用就会卡住,可以迷惑攻击者;当然这也可能意味着这个数据库连接可能 无法立即释放 ,大规模请求可能会导致服务器负载。

- bindparam -

​ 这个相较于上边 2 个是完全不一样的,关键点在于对传入的参数进行的 参数绑定 ,也就是告诉数据库,用户输入的内容是 查询的参数 不参与 数据库语句 的编译,那么也就可以避免用户输入的内容被数据库当作 数据库语句 执行了。