--- /dev/null
+# Anemon Dhcp
+# Copyright (C) 2005 Mathieu Ignacio -- mignacio@april.org
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
+
+import operator
+from struct import unpack
+from struct import pack
+from dhcp_basic_packet import *
+from dhcp_constants import *
+from type_ipv4 import ipv4
+from type_strlist import strlist
+class DhcpPacket(DhcpBasicPacket):
+
+
+ # Useful function for debugging
+ def PrintHeaders(self):
+ print "# Header fields\n"
+ print "readable_dhcp_headers = {"
+ for opt in ['op','htype','hlen','hops','xid','secs','flags',
+ 'ciaddr','yiaddr','siaddr','giaddr','chaddr','sname','file'] :
+ begin = DhcpFields[opt][0]
+ end = DhcpFields[opt][0]+DhcpFields[opt][1]
+ data = self.packet_data[begin:end]
+ if DhcpFieldsTypes[opt] == "int" : result = str(data[0])
+ if DhcpFieldsTypes[opt] == "int2" : result = str(data[0]*256+data[0])
+ if DhcpFieldsTypes[opt] == "int4" : result = str(ipv4(data).int())
+ if DhcpFieldsTypes[opt] == "str" : result = strlist(data).str()
+ if DhcpFieldsTypes[opt] == "ipv4" : result = ipv4(data).str()
+ if DhcpFieldsTypes[opt] == "hwmac" : result = "".join(map(chr,data))
+
+ line = "\t'"+opt+"':"+str(data)+",\t# "+result
+ print line
+ print "\t'end':'true'}"
+
+ # Useful function for debugging
+ def PrintOptions(self):
+ print "# Options fields"
+ print "readable_dhcp_options = {"
+ for opt in self.options_data.keys():
+ data = self.options_data[opt]
+ result = ""
+ optnum = DhcpOptions[opt]
+ if DhcpOptionsTypes[optnum] == "char" : result = str(data[0])
+ if DhcpOptionsTypes[optnum] == "16-bits" : result = str(data[0]*256+data[0])
+ if DhcpOptionsTypes[optnum] == "32bits" : result = str(ipv4(data).int())
+ if DhcpOptionsTypes[optnum] == "string" : result = strlist(data).str()
+ if DhcpOptionsTypes[optnum] == "ipv4" : result = ipv4(data).str()
+ if DhcpOptionsTypes[optnum] == "ipv4+" :
+ for i in range(0,len(data),4) :
+ if len(data[i:i+4]) == 4 :
+ result += ipv4(data[i:i+4]).str() + " - "
+ line = "\t'"+opt+"':"+str(data)+",\t# "+result
+ print line
+ print "\t'end':'true'}"
+
+
+
+ # FIXME: This is called from IsDhcpSomethingPacket, but is this really
+ # needed? Or maybe this testing should be done in
+ # DhcpBasicPacket.DecodePacket().
+
+ # Test Packet Type
+ def IsDhcpSomethingPacket(self,type):
+ if self.IsDhcpPacket() == False : return False
+ if self.IsOption("dhcp_message_type") == False : return False
+ if self.GetOption("dhcp_message_type") != type : return False
+ return True
+
+ def IsDhcpDiscoverPacket(self):
+ return self.IsDhcpSomethingPacket([1])
+
+ def IsDhcpOfferPacket(self):
+ return self.IsDhcpSomethingPacket([2])
+
+ def IsDhcpRequestPacket(self):
+ return self.IsDhcpSomethingPacket([3])
+
+ def IsDhcpDeclinePacket(self):
+ return self.IsDhcpSomethingPacket([4])
+
+ def IsDhcpAckPacket(self):
+ return self.IsDhcpSomethingPacket([5])
+
+ def IsDhcpNackPacket(self):
+ return self.IsDhcpSomethingPacket([6])
+
+ def IsDhcpReleasePacket(self):
+ return self.IsDhcpSomethingPacket([7])
+
+ def IsDhcpInformPacket(self):
+ return self.IsDhcpSomethingPacket([8])
+
+
+ def GetMultipleOptions(self,options=()):
+ result = {}
+ for each in options:
+ result[each] = self.GetOption(each)
+ return result
+
+ def SetMultipleOptions(self,options={}):
+ for each in options.keys():
+ self.SetOption(each,options[each])
+
+
+
+
+
+
+ # Creating Response Packet
+
+ # Server-side functions
+ # From RFC 2132 page 28/29
+ def CreateDhcpOfferPacketFrom(self,src): # src = discover packet
+ self.SetOption("htype",src.GetOption("htype"))
+ self.SetOption("xid",src.GetOption("xid"))
+ self.SetOption("flags",src.GetOption("flags"))
+ self.SetOption("giaddr",src.GetOption("giaddr"))
+ self.SetOption("chaddr",src.GetOption("chaddr"))
+ self.SetOption("ip_address_lease_time",src.GetOption("ip_address_lease_time"))
+ self.TransformToDhcpOfferPacket()
+
+ def TransformToDhcpOfferPacket(self):
+ self.SetOption("dhcp_message_type",[2])
+ self.SetOption("op",[2])
+ self.SetOption("hlen",[6])
+
+ self.DeleteOption("secs")
+ self.DeleteOption("ciaddr")
+ self.DeleteOption("request_ip_address")
+ self.DeleteOption("parameter_request_list")
+ self.DeleteOption("client_identifier")
+ self.DeleteOption("maximum_message_size")
+
+
+
+
+
+ """ Dhcp ACK packet creation """
+ def CreateDhcpAckPacketFrom(self,src): # src = request or inform packet
+ self.SetOption("htype",src.GetOption("htype"))
+ self.SetOption("xid",src.GetOption("xid"))
+ self.SetOption("ciaddr",src.GetOption("ciaddr"))
+ self.SetOption("flags",src.GetOption("flags"))
+ self.SetOption("giaddr",src.GetOption("giaddr"))
+ self.SetOption("chaddr",src.GetOption("chaddr"))
+ self.SetOption("ip_address_lease_time_option",src.GetOption("ip_address_lease_time_option"))
+ self.TransformToDhcpAckPacket()
+
+ def TransformToDhcpAckPacket(self): # src = request or inform packet
+ self.SetOption("op",[2])
+ self.SetOption("hlen",[6])
+ self.SetOption("dhcp_message_type",[5])
+
+ self.DeleteOption("secs")
+ self.DeleteOption("request_ip_address")
+ self.DeleteOption("parameter_request_list")
+ self.DeleteOption("client_identifier")
+ self.DeleteOption("maximum_message_size")
+
+
+ """ Dhcp NACK packet creation """
+ def CreateDhcpNackPacketFrom(self,src): # src = request or inform packet
+
+ self.SetOption("htype",src.GetOption("htype"))
+ self.SetOption("xid",src.GetOption("xid"))
+ self.SetOption("flags",src.GetOption("flags"))
+ self.SetOption("giaddr",src.GetOption("giaddr"))
+ self.SetOption("chaddr",src.GetOption("chaddr"))
+ self.TransformToDhcpNackPacket()
+
+ def TransformToDhcpNackPacket(self):
+ self.SetOption("op",[2])
+ self.SetOption("hlen",[6])
+ self.DeleteOption("secs")
+ self.DeleteOption("ciaddr")
+ self.DeleteOption("yiaddr")
+ self.DeleteOption("siaddr")
+ self.DeleteOption("sname")
+ self.DeleteOption("file")
+ self.DeleteOption("request_ip_address")
+ self.DeleteOption("ip_address_lease_time_option")
+ self.DeleteOption("parameter_request_list")
+ self.DeleteOption("client_identifier")
+ self.DeleteOption("maximum_message_size")
+ self.SetOption("dhcp_message_type",[6])
+
+
+
+
+
+
+
+ """ GetClientIdentifier """
+
+ def GetClientIdentifier(self) :
+ if self.IsOption("client_identifier") :
+ return self.GetOption("client_identifier")
+ return []
+
+ def GetGiaddr(self) :
+ return self.GetOption("giaddr")
+
+ def GetHardwareAddress(self) :
+ length = self.GetOption("hlen")[0]
+ full_hw = self.GetOption("chaddr")
+ if length!=[] and length<len(full_hw) : return full_hw[0:length]
+ return full_hw
+