使用python编写端口扫描工具

打印 上一主题 下一主题

主题 984|帖子 984|积分 2952

端口扫描工具编写


目录

0x01:实现端口扫描的方式

一、TCP扫描:

TCP connect扫描,也称为全连接扫描,这种方式直接连接到目标端口,完成了TCP三次握手的过程,这种方式扫描结果比较准确,但速度比较慢而且可轻易被目标系统检测到。
二、SYN扫描:

TCP SYN扫描是另一种TCP扫描。端口扫描工具不使用操作系统原生网络功能,而是自行生成、发送IP数据包,并监控其回应。这种扫描模式被称为“半开放扫描”,因为它从不建立完整的TCP连接。端口扫描工具生成一个SYN包,如果目标端口开放,则会返回SYN-ACK包。扫描端回应一个RST包,然后在握手完成前关闭连接。如果端口关闭了但未使用过滤,目标端口应该会持续返回RST包。
三、UDP扫描:

UDP扫描发送空的(没有数据)UDP报头到每个目标端口。 如果返回ICMP端口不可到达错误(类型3,代码3), 该端口是closed(关闭的)。 其它ICMP不可到达错误(类型3, 代码1,2,9,10,或者13)表明该端口是filtered(被过滤的)。 偶尔地,某服务会响应一个UDP报文,证明该端口是open(开放的)。
0x02:使用python实现端口扫描

一、使用socket库的connect()方法扫描

原理:指定IP和端口号,连接到特定主机对应的远程socket,连接失败会抛出timeout错误,通过判断是否连接成功来判断端口是否开放。
1、核心代码

首先使用socket.socket(socket.AF_INET, socket.SOCK_STREAM)建立一个基于网络并且使用TCP协议的套接字;
之后使用connect()尝试建立连接,需要的参数为IP和端口号;
最后根据判断返回的结果是成功还是超时,来判断端口是否开放。
就这三步,转化成实际的代码就是下面这样:
  1. import socket
  2. def star_scan(ip, port):
  3.     try:
  4.         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)   # 创建一个基于网络并且使用tcp协议的套接字,用于通信。
  5.         s.settimeout(0.02)        # 设置超时时间
  6.         s.connect((ip, port))
  7.     except Exception as e:
  8.         print(e)
  9.     else:
  10.         res = s.recv(3096).decode('utf-8').encode()
  11.         print(res)
  12.         print("[+]{}:{} \topen".format(ip, port))
  13.     finally:
  14.         s.close()
  15.         
  16. ip = "192.168.247.135"
  17. port = 22
  18. star_scan(ip, port)
复制代码
主要代码都在star_scan()函数里,往里面传入指定的IP和端口,就可以看到该端口是否开放。


上图是程序运行的结果。
只做成这样肯定是不够的,下一步是实现批量端口的扫描,但是探测的原理还是这个。
2、批量端口扫描

要实现批量端口的扫描,主要的问题就是如何让计算机知道你想要扫的端口有哪些。
最简单的方法是一个一个指定,把要扫的端口全输入进去或者放到文件里让程序去读。一般情况下这都是一种很笨的方法,假如我们要指定1000个端口去扫描,那要输1000个端口,要是扫描全端口就更加麻烦。但是有的时候也是会使用这种方法的,有些端口就是比其他端口更常使用,这些算是常用端口;有些端口打开后假如被恶意利用,会造成非常严重的后果(比如3389端口),这些算是高危端口。
在知道这些端口的存在后,我们可以把它们存到列表里,每次扫描时都默认扫描这些端口,因为它们的利用价值更大,这样扫效率也更高。
把哪些端口定义为默认扫描端口也是门学问,太多太少都不太好,可以去GitHub上找一下大佬写的端口扫描工具或者综合扫描工具,看看他们默认都扫描哪些端口,或者看看nmap等工具是怎么定义的,这里就不列出来了。
还有一种常用的方式是指定端口范围,一般使用-指定,比如1-100这样。
通过设定默认扫描端口和输入端口范围,可以满足大部分使用需求了。由于设定默认端口在代码的实现上比较简单,所以这里主要实现指定端口范围,也就是让程序能够解析1-100。具体实现代码如下:
  1. import socket
  2. def make_port_list(ports):
  3.     new_port_list = []
  4.     if "," in ports:
  5.         temp_list = ports.split(",")
  6.         for port in temp_list:
  7.           if "-" in port:
  8.             for p in range(int(port.split("-")[0]), int(port.split("-")[1]) + 1):
  9.                 new_port_list.append(p)
  10.           else:
  11.             new_port_list.append(port)
  12.     elif "-" in ports:
  13.         for p in range(int(ports.split("-")[0]), int(ports.split("-")[1]) + 1):
  14.             new_port_list.append(p)
  15.     else:
  16.         new_port_list.append(ports)
  17.     return new_port_list
  18. def star_scan(ip, port):
  19.     try:
  20.         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)   # 创建一个基于网络并且使用tcp协议的套接字,用于通信。
  21.         s.settimeout(0.02)        # 设置超时时间
  22.         s.connect((ip, port))
  23.     except Exception as e:
  24.         pass
  25.         #print(e)
  26.         #print("[+]{}:{} \tclosed".format(ip, port))
  27.     else:
  28.         res = s.recv(3096).decode('utf-8').encode()
  29.         print(res)
  30.         print("[+]{}:{} \topen".format(ip, port))
  31.     finally:
  32.         s.close()
  33.         
  34. ip = "192.168.247.135"
  35. port = "21,22-24"
  36. port_list = make_port_list(port)
  37. print("total port num:{}".format(len(port_list)))
  38. for port in port_list:
  39.   star_scan(ip, int(port))
复制代码
正常情况下,要扫描的端口应以命令参数的形式输入进去,这里为了方便直接以变量形式写进了代码。
这里扫描了指定端口21和端口范围22-24,以,为分隔符区分,通过make_port_list()函数解析,返回一个端口列表,之后循环调用star_scan函数来实现批量扫描,运行结果如下:

这里把扫描失败时的输出提示给注释掉了,因为要是扫描的端口过多这里会看着非常乱。
程序到这里还没有完成,可以看到这里在扫描4个端口的情况下用了快1秒的时间,实际扫描的时候肯定不会只扫几个端口,那时候会扫描成千上万个端口,这时的耗时会非常大:

这是由于网络I/O操作非常的耗时,程序大部分时间都耗在了等待上面,下面就要解决这个问题。
3、使用协程提高效率

关于协程,这里也不会做太多描述(我也不太懂)。大概就是在一个协程进入IO等待时,会自动切换到其他协程继续执行,也就是最大化的利用了IO等待时间,这个过程一直都是单线程,只是程序在一个协程进入等待后切换到另一个协程给CPU处理,而对于CPU来说则一直是同一个程序在给它发任务,CPU是感受不到区别的。
套用到这个程序里,可以理解为:我们每扫一个端口都要建立一次连接,而每次建立连接都会有一次等待,那我们就把每一个要建立的连接都放到一个协程里,在一个连接也就是协程进入IO等待后,系统会自动切换去建立下一个连接,这样效率就会大大提升,这个切换时自动的,完全不需要我们去做什么。
python中可以使用gevent库来实现协程,操作也非常简单。
1、使用gevent.spawn(函数名)创建协程
2、使用gevent.join()或者gevent.joinall()执行协程
于是这里就用我们程序里的star_scan()函数创建协程,协程的数量等于我们要扫描的次数,最后一起执行就可以了。
还有一个问题就是每一个star_scan()函数从哪里获取我们要扫描的IP和端口号信息,这里我们使用queue(队列)来解决这个问题。先把所有的IP和端口号放入队列,根据队列长度创建协程,之后每个协程在执行任务的时候就把队列里的信息提取出来一个,当把队列空了的时候,任务也就处理完了。
下面是完整代码:
  1. import socket
  2. import sys
  3. from queue import Queue
  4. from gevent import monkey; monkey.patch_socket()
  5. import gevent
  6. def make_port_list(ports):
  7.     new_port_list = []
  8.     if "," in ports:
  9.         temp_list = ports.split(",")
  10.         for port in temp_list:
  11.           if "-" in port:
  12.             for p in range(int(port.split("-")[0]), int(port.split("-")[1]) + 1):
  13.                 new_port_list.append(p)
  14.           else:
  15.             new_port_list.append(port)
  16.     elif "-" in ports:
  17.         for p in range(int(ports.split("-")[0]), int(ports.split("-")[1]) + 1):
  18.             new_port_list.append(p)
  19.     else:
  20.         new_port_list.append(ports)
  21.     return new_port_list
  22. def coroutines():
  23.     # 开启多协程
  24.     cos = []
  25.     num = ip_port.qsize()
  26.     print(num)
  27.     for i in range(num):
  28.         # 调用工作函数
  29.         cor = gevent.spawn(star_scan)
  30.         cos.append(cor)
  31.     gevent.joinall(cos)
  32. def star_scan():
  33.   sockets = ip_port.get()
  34.   ip = sockets[0]
  35.   port = int(sockets[1])
  36.   try:
  37.       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)   # 创建一个基于网络并且使用tcp协议的套接字,用于通信。
  38.       s.settimeout(0.02)        # 设置超时时间
  39.       s.connect((ip, port))
  40.   except Exception as e:
  41.       pass
  42.       #print("[+]{}:{} \tclosed".format(ip, port))
  43.   else:
  44.       res = s.recv(3096).decode('utf-8').encode()
  45.       print(res)
  46.       print("[+]{}:{} \topen".format(ip, port))
  47.   finally:
  48.       s.close()
  49. ip_port = Queue()
  50. ip = "192.168.247.135"
  51. port = "21,22-2400"
  52. port_list = make_port_list(port)
  53. print("total port num:{}".format(len(port_list)))
  54. for port in port_list:
  55.   ip_port.put([ip, port])
  56. coroutines()
复制代码
主要是加了使用ip_port = Queue()创建队列,再用put()函数将参数放入队列。最后调用coroutines()函数开启协程,star_scan()里获取参数的方式也变成了用get()获取队列中的数据。

可以看到速度确实是快了,但是准确性降低了,22端口明明是开着的这里却没有扫到......
这里不太清楚具体原因,但是也是可以解决的,把超时时间设置的长一点就好了,也就是修改star_scan()里的settimeout()里面的值。还有一种解决方案是修改每次创建协程的最大值,超时时间少点也能扫到。
我稍微改了下代码,把直接创建所有协程改成50个50个创建,最后代码如下:
  1. import socket
  2. import sys
  3. from queue import Queue
  4. from gevent import monkey; monkey.patch_socket()
  5. import gevent
  6. def make_port_list(ports):
  7.     new_port_list = []
  8.     if "," in ports:
  9.         temp_list = ports.split(",")
  10.         for port in temp_list:
  11.           if "-" in port:
  12.             for p in range(int(port.split("-")[0]), int(port.split("-")[1]) + 1):
  13.                 new_port_list.append(p)
  14.           else:
  15.             new_port_list.append(port)
  16.     elif "-" in ports:
  17.         for p in range(int(ports.split("-")[0]), int(ports.split("-")[1]) + 1):
  18.             new_port_list.append(p)
  19.     else:
  20.         new_port_list.append(ports)
  21.     return new_port_list
  22. def coroutines(num):
  23.     # 开启多协程
  24.     # print("开启协程{}个".format(num))
  25.     cos = []
  26.     for i in range(num):
  27.         # 调用工作函数
  28.         cor = gevent.spawn(star_scan)
  29.         cos.append(cor)
  30.     gevent.joinall(cos)
  31. def star_scan():
  32.   sockets = ip_port.get()
  33.   ip = sockets[0]
  34.   port = int(sockets[1])
  35.   try:
  36.       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)   # 创建一个基于网络并且使用tcp协议的套接字,用于通信。
  37.       s.settimeout(0.02)        # 设置超时时间
  38.       s.connect((ip, port))
  39.   except Exception as e:
  40.       pass
  41.       #print("[+]{}:{} \tclosed".format(ip, port))
  42.   else:
  43.       res = s.recv(3096).decode('utf-8').encode()
  44.       print(res)
  45.       print("[+]{}:{} \topen".format(ip, port))
  46.   finally:
  47.       s.close()
  48. ip = "192.168.247.135"
  49. port = "21,22-2400"
  50. port_list = make_port_list(port)
  51. print("total port num:{}".format(len(port_list)))
  52. ip_port = Queue()   # 创建队列
  53. for port in port_list:  
  54.   ip_port.put([ip, port])
  55. num = 50
  56. if len(port_list)%num == 0:
  57.   turn = int(len(port_list)/num)
  58.   for i in range(turn):
  59.     coroutines(num)
  60. else:
  61.   turn = int(len(port_list)/num)+1
  62.   for i in range(turn):
  63.     if i == turn-1:
  64.       coroutines(len(port_list)%num)
  65.     else:
  66.       coroutines(num)
复制代码
最后结果就可以扫出来了,后续可以把每次创建协程数当作参数输入进去,这样方便调整。
0x03:总结

到目前为止程序就写的差不多了,只能说基本能用了。接下来就该是实现命令行参数的解析,不能每次换目标都要改代码,这里可以使用argparse模块来解析命令行参数,也可以使用gooey弄个简单的图形化界面,因为懒我就没接着往下写了。
这次是使用的socket库来实现的TCP扫描,其实方法不是唯一的,用scapy发包来扫描也是可以的,甚至直接用nmap模块调nmap来扫描也行。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

风雨同行

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表