Python 5 5/8/2017


SUBMITTED BY: Guest

DATE: May 8, 2017, 5:56 p.m.

FORMAT: Python

SIZE: 6.7 kB

HITS: 439

  1. If you want more of my pastes visit: https://randompaste.000webhostapp.com/index.html
  2. --------------------------------------------------------------------------------------
  3. view my last post at: https://bitbin.it/uZ9AzH26/
  4. --------------------------------------------------------------------------------------
  5. from ctypes import BigEndianStructure as Structure
  6. from ctypes import Union as union
  7. import ctypes as c
  8. from enum import Enum
  9. import io
  10. import struct
  11. import math
  12. class RobotModes(Enum):
  13. #DISABLED = 0
  14. TELEOP = 0
  15. TEST = 1
  16. AUTO = 2
  17. ESTOPPED = 0x80
  18. M_ENABLED = 4
  19. class TeamStation(Enum):
  20. RED_1 = 0
  21. RED_2 = 1
  22. RED_3 = 2
  23. BLUE_1 = 3
  24. BLUE_2 = 4
  25. BLUE_3 = 5
  26. class RobotMode:
  27. def __init__(self, val=0):
  28. self.mode = 0
  29. self.update(val)
  30. def update(self, val):
  31. if self.mode != RobotModes.ESTOPPED: self.mode = RobotModes(val & ~4)
  32. self.enabled = bool(val & 4)
  33. def __int__(self):
  34. return self.mode.value | (4 if self.enabled else 0)
  35. def __repr__(self):
  36. return 'RobotMode(enabled={}, mode={})'.format(self.enabled, self.mode)
  37. #class RobotModeStruct(Structure):
  38. # _fields_ = [('test', c.c_uint, 1),
  39. # ('auto', c.c_uint, 1),
  40. # ('teleop', c.c_uint, 1),
  41. # ('enabled', c.c_uint, 1),
  42. # ('reserved', c.c_uint, 4),
  43. # ('estopped', c.c_uint, 1)]
  44. class TaggedStruct(Structure):
  45. _fields_ = [('tag', c.c_uint8)]
  46. _tag = 0
  47. def __init__(self, **kwargs):
  48. if 'tag' not in kwargs: kwargs['tag'] = type(self)._tag
  49. super().__init__(**kwargs)
  50. class StatusPacket(TaggedStruct):
  51. _tag = 1
  52. class BatteryVoltage(Structure):
  53. _fields_ = [('integer', c.c_uint8),
  54. ('decimal', c.c_uint8)]
  55. def f_set(self, val):
  56. self.integer = int(val)
  57. self.decimal = int(val * 100) % 100
  58. def __float__(self):
  59. return self.integer + self.decimal / 100.
  60. def __str__(self):
  61. return '{}.{:02}'.format(self.integer, self.decimal)
  62. _fields_ = [('mode', c.c_uint8),
  63. ('status', c.c_uint8),
  64. ('bat_voltage', BatteryVoltage),
  65. ('request', c.c_uint8)]
  66. tags = {}
  67. def r_tag(tag):
  68. def _wrap(cls):
  69. tags[tag] = cls
  70. return cls
  71. return _wrap
  72. class RecievePacket(Structure):
  73. @classmethod
  74. def i_read(cls, state, bytes):
  75. val = cls.from_buffer_copy(bytes)
  76. val.read(state)
  77. @r_tag(1)
  78. class ControlPacket(RecievePacket):
  79. name = 'control'
  80. _fields_ = [('mode', c.c_uint8), # control
  81. ('request', c.c_uint8),
  82. ('alliance', c.c_uint8),
  83. ]
  84. @r_tag(0x0c)
  85. class JoystickPacket(RecievePacket):
  86. @staticmethod
  87. def read(rs, joysticks):
  88. rs.joysticks = rs.joysticks[:len(joysticks)]
  89. for i in range(len(joysticks) - len(rs.joysticks)): rs.joysticks.append(JoystickState())
  90. for n, o in zip(joysticks, rs.joysticks):
  91. o.update(n)
  92. pass
  93. class ButtonState:
  94. def __init__(self, val, n):
  95. self.n = n
  96. self.val = val
  97. def __getitem__(self, n):
  98. return bool(self.val & 1<<n)
  99. def __len__(self):
  100. return self.n
  101. def __repr__(self):
  102. buttons = ('{:0' + str(self.n) + 'b}').format(self.val)
  103. return 'ButtonState(0b{}, {})'.format(buttons, self.n)
  104. def __iter__(self):
  105. for i in range(len(self)):
  106. yield self[i]
  107. raise StopIteration
  108. class JoystickState:
  109. def __init__(self, axes=[], buttons=None):
  110. self.axes = axes
  111. self.buttons = buttons
  112. def update(self, b):
  113. rio = io.BytesIO(b)
  114. n_axes = rio.read(1)[0]#struct.unpack('>B', rio.read(1))
  115. #if n_axes != len(self.axes): self.axes = [None] * n_axes
  116. d = rio.read(n_axes)
  117. self.axes = [((x ^ 0x80) - 0x80) / 128 for x in d]
  118. n_buttons = rio.read(1)[0]#struct.unpack('>B', rio.read(1))
  119. x = 0
  120. #for i in range(0, n_buttons, 8):
  121. # x <<= 8
  122. # x |= rio.read(1)[0]
  123. for c in rio.read(math.ceil(n_buttons / 8)):
  124. x <<= 8
  125. x |= c
  126. self.buttons = ButtonState(x, n_buttons)
  127. # for i in range(8):
  128. # self.buttons.append(bool(x & 1))
  129. # x >>= 1
  130. # bl -= 1
  131. # if bl == 0: break
  132. #print(repr(rio.read()))
  133. def __repr__(self):
  134. return 'JoystickState(axes=[{}], buttons={})'.format(', '.join(str(x) for x in self.axes), self.buttons)
  135. def tag_read(type, f):
  136. return type.from_buffer_copy(f.read(c.sizeof(type)))
  137. class RobotState:
  138. def __init__(self):
  139. #self.status = None
  140. self.voltage = 6.39
  141. self.mode = RobotMode()
  142. self.alliance = None
  143. self.status = 0x20
  144. self.joysticks = []
  145. def __repr__(self):
  146. return 'RobotState(mode={}, alliance={}, joysticks={})'.format(self.mode, self.alliance , self.joysticks)
  147. def recv(self, pkt):
  148. f = io.BytesIO(pkt)
  149. pn, = struct.unpack_from('>H', f.read(2))
  150. tag = f.read(1)[0]#, = struct.unpack_from('>B', f.read(1))
  151. #print(tag)
  152. assert(tag == 1) # right?
  153. self.read_control(tag_read(ControlPacket, f))
  154. joysticks = []
  155. while True:
  156. x = f.read(2)
  157. if x == b'': break
  158. len, tag = struct.unpack('>BB', x) # might be uint16
  159. #print(len, tag, repr(f.read(len)))
  160. # why is the protocol so weird
  161. t = tags[tag]
  162. if t == JoystickPacket:
  163. joysticks.append(f.read(len))
  164. else:
  165. t.i_read(self, f.read(len))
  166. if joysticks: JoystickPacket.read(self, joysticks)
  167. #o.read(self)
  168. # deal with it somehow
  169. return pn
  170. def read_control(self, packet):
  171. self.mode.update(packet.mode)
  172. #print('R', packet.mode)
  173. if packet.request: self.request(packet.request)
  174. self.alliance = TeamStation(packet.alliance)
  175. def write_status(self):
  176. p = StatusPacket()
  177. p.mode = int(self.mode)
  178. #print('S', p.mode)
  179. p.request = 0 # whatever
  180. p.bat_voltage.f_set(self.voltage)
  181. p.status = self.status
  182. p.tag = 1
  183. return p
  184. def send(self, pn):
  185. f = io.BytesIO()
  186. f.write(struct.pack('>H', pn))
  187. f.write(self.write_status())
  188. #print(pn)
  189. f.seek(0)
  190. return f.read()
  191. def respond(self, pkt):
  192. return self.send(self.recv(pkt))
  193. def request(self, req):
  194. #print('{:02x}'.format(req))
  195. pass
  196. #if r
  197. #print(req)
  198. import socketserver
  199. state = RobotState()
  200. class MyUDPHandler(socketserver.BaseRequestHandler):
  201. """
  202. This class works similar to the TCP handler class, except that
  203. self.request consists of a pair of data and client socket, and since
  204. there is no connection the client address must be given explicitly
  205. when sending data back via sendto().
  206. """
  207. def handle(self):
  208. data = self.request[0]#.strip()
  209. socket = self.request[1]
  210. #print("{} wrote:".format(self.client_address[0]))
  211. resp = state.respond(data)
  212. print(state)
  213. #print(resp)
  214. #print(repr(data))
  215. #a = bytes(state.send())
  216. socket.sendto(resp, (self.client_address[0], 1150))
  217. if __name__ == "__main__":
  218. HOST, PORT = "localhost", 1110
  219. server = socketserver.UDPServer((HOST, PORT), MyUDPHandler)
  220. server.serve_forever()
  221. z = b'2\xc5\x01\x00\x00\x00\x00\x00'

comments powered by Disqus