ICMP协议编程实践:实现ping命令(Python语言)

众所周知, ping 命令通过 ICMP 协议探测目标 IP 并计算 往返时间 。本文使用 Python 语言开发一个 ping 命令,以演示如何通过 套接字**发送接收**ICMP 协议报文。

注解

程序源码 可在本文末尾复制,或者在 Github 上下载: ping.py

报文封装

ICMP 报文承载在 IP 报文之上,头部结构非常简单:

../_images/476c9d2e44224eaa078f80bdbad440f9.gif

注意到, ICMP 头部只有三个固定字段,其余部分因消息类型而异。固定字段如下:

  • type消息类型
  • code代码
  • checksum校验和

ICMP 报文有很多不同的类型,由 typecode 字段区分。而 ping 命令使用其中两种:

../_images/c633276d3679c45943a4f2d7c2b55e05.png ping命令原理

如上图,机器 A 通过 回显请求 ( Echo Request ) 询问机器 B ;机器 B 收到报文后通过 回显答复 ( Echo Reply ) 响应机器 A 。这两种报文的典型结构如下:

../_images/31beaa9ddfb5278c7cd98dc4c8624a5b.png

对应的 type 以及 code 字段值列举如下:

表-1 回显报文类型
名称类型”代码“
回显请求80
回显答复00

按照惯例,回显报文除了固定字段,其余部分组织成 3 个字段:

  • 标识符 ( identifier ),一般填写进程 PID 以区分其他 ping 进程;
  • 报文序号 ( sequence number ),用于编号报文序列;
  • 数据 ( data ),可以是任意数据;

ICMP 规定, 回显答复 报文原封不动回传这些字段。因此,可以将 发送时间 封装在 数据负载 ( payload )中,收到答复后将其取出,用于计算 往返时间 ( round trip time )。

Python 标准库 struct 模块提供了用于 封装网络报文 的工具,可以这样封装数据负载:

  1. import struct
  2.  
  3. sending_ts = time.time()
  4. payload = struct.pack('!d', sending_ts)

这段代码将当前时间戳封装起来,其中 ! 表示 网络字节序d 表示双精度浮点。

封装报文头部也是类似的:

  1. header = struct.pack('!BBHHH', _type, code, checksum, ident, seq)
  2. icmp = header + payload

其中, B 表示长度为一个字节的无符号整数, H 表示长度为两个字节的无符号整数。

校验和

ICMP 报文校验和字段需要自行计算,计算步骤如下:

  • 0 为校验和封装一个用于计算的 伪报文
  • 将报文分成两个字节一组,如果总字节数为奇数,则在末尾追加一个零字节;
  • 对所有 双字节 进行按位求和;
  • 将高于 16 位的进位取出相加,直到没有进位;
  • 将校验和按位取反;

示例代码如下:

  1. def calculate_checksum(icmp):
  2. if len(icmp) % 2:
  3. icmp += b'\00'
  4.  
  5. checksum = 0
  6. for i in range(len(icmp)//2):
  7. word, = struct.unpack('!H', icmp[2i:2i+2])
  8. checksum += word
  9.  
  10. while True:
  11. carry = checksum >> 16
  12. if carry:
  13. checksum = (checksum & 0xffff) + carry
  14. else:
  15. break
  16.  
  17. checksum = ~checksum & 0xffff
  18.  
  19. return struct.pack('!H', checksum)

套接字

编程实现网络通讯,离不开 套接字 ( socket ),收发 ICMP 报文当然也不例外:

  1. from socket import socket, AF_INET, SOCK_RAW, IPPROTO_ICMP
  2.  
  3. s = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP)

调用 sendto 系统调用发送 ICMP 报文:

  1. s.sendto(icmp, 0, ('xxx.xxx.xxx.xxx', 0))

其中,第一个参数为封装好的 ICMP 报文;第二个参数为发送标志位,无特殊要求一般填 0 ;第三个参数为目的 IP 地址-端口对,端口这里填 0

调用 recvfrom 方法接收 ICMP 报文:

  1. ip, (src_ip, _) = s.recvfrom(1500)
  2. icmp = ip[20:]

参数为接收缓冲区大小,这里用 1500 刚好是一个典型的 MTU 大小。注意到, recvfrom 系统调用返回 IP 报文,去掉前 20 字节的 IP 头部便得到 ICMP 报文。

注解

注意,创建 原始套接字 ( SOCK_RAW )需要超级用户权限。

程序实现

掌握基本原理后,便可着手编写代码了。

首先,实现 pack_icmp_echo_request 函数,用于封装 ICMP回显请求 报文:

  1. def pack_icmp_echo_request(ident, seq, payload):
  2. pseudo = struct.pack(
  3. '!BBHHH',
  4. 8,
  5. 0,
  6. 0,
  7. ident,
  8. seq,
  9. ) + payload
  10. checksum = calculate_checksum(pseudo)
  11. return pseudo[:2] + checksum + pseudo[4:]

2-9 行封装用于计算校验和的 伪报文 ,注意到 类型 字段为 8代码 字段为 0校验和 字段为 0标识符序号 以及 数据负载 字段由参数指定;第 10 行调用 calculate_checksum 函数计算 校验和 ;第 11 行替换伪报文中的校验和并返回。

对应地,实现 unpack_icmp_echo_reply 用于解析 ICMP回显答复 报文:

  1. def unpackicmp_echo_reply(icmp):
  2. _type, code, , ident, seq, = struct.unpack(
  3. '!BBHHH',
  4. icmp[:8]
  5. )
  6. if _type != 0:
  7. return
  8. if code != 0:
  9. return
  10.  
  11. payload = icmp[8:]
  12.  
  13. return ident, seq, payload

接着,实现 send_routine 用于循环发送 ICMP回显请求 报文:

  1. def send_routine(sock, addr, ident, magic, stop):
  2. # first sequence no
  3. seq = 1
  4.  
  5. while not stop:
  6. # currrent time
  7. sending_ts = time.time()
  8.  
  9. # packet current time to payload
  10. # in order to calculate round trip time from reply
  11. payload = struct.pack('!d', sending_ts) + magic
  12.  
  13. # pack icmp packet
  14. icmp = pack_icmp_echo_request(ident, seq, payload)
  15.  
  16. # send it
  17. sock.sendto(icmp, 0, (addr, 0))
  18.  
  19. seq += 1
  20. time.sleep(1)

该函数需要 5 个参数,分别如下:

  • sock ,用于发送报文的 套接字
  • addr ,目标 IP地址
  • ident标识符
  • magic ,打包在数据负载中的魔性字符串;
  • stop ,停止发送标识;

3 行定义 报文序号 ,从 1 开始递增;接着是发送循环,不停发包,每次相隔一秒;第 7 行获取 发送时间戳 ;第 11 行将时间戳以及魔性字符串打包成 数据负载 ;第 14 行调用 pack_icmp_echo_request 封装 回显请求 报文;第 17 行调用 sendto 系统调用 发送报文 ;第 19-20 行自增发送序号并等待一秒。

同样,实现 recv_routine 函数用于循环接收 ICMP回显答复 报文:

  1. def recvroutine(sock, ident, magic):
  2. while True:
  3. # wait for another icmp packet
  4. ip, (src_addr, ) = sock.recvfrom(1500)
  5.  
  6. # unpack it
  7. result = unpack_icmp_echo_reply(ip[20:])
  8. if not result:
  9. continue
  10.  
  11. # print info
  12. _ident, seq, payload = result
  13. if _ident != ident:
  14. continue
  15.  
  16. sending_ts, = struct.unpack('!d', payload[:8])
  17. print('%s seq=%d %5.2fms' % (
  18. src_addr,
  19. seq,
  20. (time.time()-sending_ts) * 1000,
  21. ))

4 行调用 recvfrom 系统调用接收 ICMP 报文;第 7 行调用 unpack_icmp_echo_reply解析报文 ;第 8-9 行忽略非回显答复报文;第 13-14 行检查标识符并忽略非法报文(可能是响应其他进程的);第 16 行从 数据负载 中取出 发送时间戳 ;第 17-21 行,计算 往返时间 并输出提示。

报文 发送接收 均实现完毕,如何让程序同时干两件事情呢?可以选用 线程 方案:

  1. def ping(addr):
  2. # create socket for sending and receiving icmp packet
  3. sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
  4.  
  5. # id field
  6. ident = os.getpid()
  7. # magic string to pad
  8. magic = b'1234567890'
  9.  
  10. # sender thread stop flag
  11. # append anything to stop
  12. sender_stop = []
  13.  
  14. # start sender thread
  15. # call send_routine function to send icmp forever
  16. args = (sock, addr, ident, magic, sender_stop,)
  17. sender = threading.Thread(target=send_routine, args=args)
  18. sender.start()
  19.  
  20. try:
  21. # receive icmp reply forever
  22. recv_routine(sock, ident, magic)
  23. except KeyboardInterrupt:
  24. pass
  25.  
  26. # tell sender thread to stop
  27. sender_stop.append(True)
  28.  
  29. # clean sender thread
  30. sender.join()
  31.  
  32. print()

3 行创建用于发送、接收报文的 套接字 ;第 6 行获取进程 PID 作为 标识符 ;第 16-18 行启动一个 子线程 执行 报文发送 函数;第 20-24主线程 执行 报文接收 函数直至用户按下 ctrl-C ;第 27 行程序退出前,通知发送线程退出并回收线程资源( join )。

将以上所有代码片段组装在一起,便得到 ping.py 命令。迫不及待想运行一下:

  1. $ sudo python ping.py 8.8.8.8
  2. 8.8.8.8 seq=1 23.18ms
  3. 8.8.8.8 seq=2 22.25ms
  4. 8.8.8.8 seq=3 34.18ms

It works!

程序源码

  1. #!/usr/bin/env python
  2. # -- encoding=utf8 --
  3.  
  4. '''
  5. FileName: ping.py
  6. Author: Fasion Chan
  7. @contact: fasionchan@gmail.com
  8. @version: $Id$
  9.  
  10. Description:
  11.  
  12. Changelog:
  13.  
  14. '''
  15.  
  16. import os
  17. import socket
  18. import struct
  19. import sys
  20. import threading
  21. import time
  22.  
  23. def calculatechecksum(icmp):
  24. if len(icmp) % 2:
  25. icmp += b'\00'
  26. checksum = 0
  27. for i in range(len(icmp)//2):
  28. word, = struct.unpack('!H', icmp[2i:2i+2])
  29. checksum += word
  30. while True:
  31. carry = checksum >> 16
  32. if carry:
  33. checksum = (checksum & 0xffff) + carry
  34. else:
  35. break
  36. checksum = ~checksum & 0xffff
  37. return struct.pack('!H', checksum)
  38. def calculate_checksum(icmp):
  39. highs = icmp[0::2]
  40. lows = icmp[1::2]
  41. checksum = ((sum(highs) << 8) + sum(lows))
  42. while True:
  43. carry = checksum >> 16
  44. if carry:
  45. checksum = (checksum & 0xffff) + carry
  46. else:
  47. break
  48. checksum = ~checksum & 0xffff
  49. return struct.pack('!H', checksum)
  50. def pack_icmp_echo_request(ident, seq, payload):
  51. pseudo = struct.pack(
  52. '!BBHHH',
  53. 8,
  54. 0,
  55. 0,
  56. ident,
  57. seq,
  58. ) + payload
  59. checksum = calculate_checksum(pseudo)
  60. return pseudo[:2] + checksum + pseudo[4:]
  61. def unpack_icmp_echo_reply(icmp):
  62. _type, code, , ident, seq, = struct.unpack(
  63. '!BBHHH',
  64. icmp[:8]
  65. )
  66. if type != 0:
  67. return
  68. if code != 0:
  69. return
  70. payload = icmp[8:]
  71. return ident, seq, payload
  72. def send_routine(sock, addr, ident, magic, stop):
  73. # first sequence no
  74. seq = 1
  75. while not stop:
  76. # currrent time
  77. sending_ts = time.time()
  78. # packet current time to payload
  79. # in order to calculate round trip time from reply
  80. payload = struct.pack('!d', sending_ts) + magic
  81. # pack icmp packet
  82. icmp = pack_icmp_echo_request(ident, seq, payload)
  83. # send it
  84. sock.sendto(icmp, 0, (addr, 0))
  85. seq += 1
  86. time.sleep(1)
  87. def recv_routine(sock, ident, magic):
  88. while True:
  89. # wait for another icmp packet
  90. ip, (src_addr, ) = sock.recvfrom(1500)
  91.  
  92. # unpack it
  93. result = unpackicmpechoreply(ip[20:])
  94. if not result:
  95. continue
  96. # print info
  97. ident, seq, payload = result
  98. if _ident != ident:
  99. continue
  100. sending_ts, = struct.unpack('!d', payload[:8])
  101. print('%s seq=%d %5.2fms' % (
  102. src_addr,
  103. seq,
  104. (time.time()-sending_ts) * 1000,
  105. ))
  106. def ping(addr):
  107. # create socket for sending and receiving icmp packet
  108. sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
  109. # id field
  110. ident = os.getpid()
  111. # magic string to pad
  112. magic = b'1234567890'
  113. # sender thread stop flag
  114. # append anything to stop
  115. sender_stop = []
  116. # start sender thread
  117. # call send_routine function to send icmp forever
  118. args = (sock, addr, ident, magic, sender_stop,)
  119. sender = threading.Thread(target=send_routine, args=args)
  120. sender.start()
  121. try:
  122. # receive icmp reply forever
  123. recv_routine(sock, ident, magic)
  124. except KeyboardInterrupt:
  125. pass
  126. # tell sender thread to stop
  127. sender_stop.append(True)
  128. # clean sender thread
  129. sender.join()
  130. print()
  131. if __name == '__main':
  132. ping(sys.argv[1])

下一步

本节以 C 语言为例,演示了 ICMP 编程方法。如果你对其他语言感兴趣,请按需取用:

订阅更新,获取更多学习资料,请关注我们的 微信公众号

../_images/wechat-mp-qrcode.png 小菜学编程

参考文献