python-network
Python module for easy networking
network.py
Go to the documentation of this file.
1 # vim: set fileencoding=utf-8 foldmethod=marker :
2 
3 # {{{ Copyright 2013-2019 Bas Wijnen <wijnen@debian.org>
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or(at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 # }}}
17 
18 
23 
24 '''@file
25 Python module for easy networking. This module intends to make networking
26 easy. It supports tcp and unix domain sockets. Connection targets can be
27 specified in several ways.
28 '''
29 
30 '''@package network Python module for easy networking.
31 This module intends to make networking easy. It supports tcp and unix domain
32 sockets. Connection targets can be specified in several ways.
33 '''
34 
35 # {{{ Imports.
36 import math
37 import sys
38 import os
39 import socket
40 import select
41 import re
42 import time
43 import inspect
44 import fhs
45 modulename = 'network'
46 fhs.module_info(modulename, 'Networking made easy', '0.2', 'Bas Wijnen <wijnen@debian.org>')
47 fhs.module_option(modulename, 'tls', 'tls hostname for server sockets or True/False for client sockets. Set to - to disable tls on server. If left empty, uses hostname for server, True for client sockets.', default = '')
48 import traceback
49 
50 try:
51  import ssl
52  have_ssl = True
53 except:
54  have_ssl = False
55 # }}}
56 
57 # {{{ Interface description
58 # - connection setup
59 # - connect to server
60 # - listen on port
61 # - when connected
62 # - send data
63 # - asynchronous read
64 # - blocking read for data
65 
66 # implementation:
67 # - Server: listener, creating Sockets on accept
68 # - Socket: used for connection; symmetric
69 # }}}
70 
71 if sys.version >= '3':
72  makestr = lambda x: str(x, 'utf8', 'replace') if isinstance(x, bytes) else x
73 else:
74  makestr = lambda x: x
75 
76 log_output = sys.stderr
77 log_date = False
78 
79 
85 def set_log_output(file):
86  global log_output, log_date
87  log_output = file
88  log_date = True
89 # }}}
90 
91 
102 def log(*message, filename = None, line = None, funcname = None, depth = 0):
103  t = time.strftime('%F %T' if log_date else '%T')
104  source = inspect.currentframe().f_back
105  for d in range(depth):
106  source = source.f_back
107  code = source.f_code
108  if filename is None:
109  filename = os.path.basename(code.co_filename)
110  if funcname is None:
111  funcname = code.co_name
112  if line is None:
113  line = source.f_lineno
114  for msg in message:
115  log_output.write(''.join(['%s %s:%s:%d:\t%s\n' % (t, filename, funcname, line, m) for m in str(msg).split('\n')]))
116  log_output.flush()
117 # }}}
118 
119 
123 def lookup(service):
124  if isinstance(service, int):
125  return service
126  try:
127  return socket.getservbyname(service)
128  except socket.error:
129  pass
130  return int(service)
131 # }}}
132 
133 
140 class _Fake:
141 
145  def __init__(self, i, o = None):
146  self._i = i
147  self._o = o if o is not None else i
148 
150  def close(self):
151  pass
152 
154  def sendall(self, data):
155  while len(data) > 0:
156  fd = self._o if isinstance(self._o, int) else self._o.fileno()
157  ret = os.write(fd, data)
158  if ret >= 0:
159  data = data[ret:]
160  continue
161  log('network.py: Failed to write data')
162  traceback.print_exc()
163 
165  def recv(self, maxsize):
166  #log('recv fake')
167  return os.read(self._i.fileno(), maxsize)
168 
170  def fileno(self):
171  # For reading.
172  return self._i.fileno()
173 # }}}
174 
175 
181 def wrap(i, o = None):
182  return Socket(_Fake(i, o))
183 # }}}
184 
185 
187 class Socket:
188 
204  def __init__(self, address, tls = False, disconnect_cb = None, remote = None, connections = None):
205 
206  self.tlstls = tls
207 
208  self.remoteremote = remote
209 
210  self.connectionsconnections = connections
211  if self.connectionsconnections is not None:
212  self.connectionsconnections.add(self)
213 
214  self.socketsocket = None
215  self._disconnect_cb_disconnect_cb = disconnect_cb
216  self._event_event = None
217  self._linebuffer_linebuffer = b''
218  if isinstance(address, (_Fake, socket.socket)):
219  #log('new %d' % id(address))
220  self.socketsocket = address
221  return
222  if isinstance(address, str) and '/' in address:
223  # Unix socket.
224  # TLS is ignored for those.
225  self.remoteremote = address
226  self.socketsocket = socket.socket(socket.AF_UNIX)
227  self.socketsocket.connect(self.remoteremote)
228  else:
229  if isinstance(address, str) and ':' in address:
230  host, port = address.rsplit(':', 1)
231  else:
232  host, port = 'localhost', address
233  self.remoteremote = (host, lookup(port))
234  #log('remote %s' % str(self.remote))
235  self._setup_connection_setup_connection()
236  # }}}
237 
240  def _setup_connection(self):
241  self.socketsocket = socket.create_connection(self.remoteremote)
242  if self.tlstls is None:
243  try:
244  assert have_ssl
245  self.socketsocket = ssl.wrap_socket(self.socketsocket, ssl_version = ssl.PROTOCOL_TLSv1)
246  self.tlstls = True
247  except:
248  self.tlstls = False
249  self.socketsocket = socket.create_connection(self.remoteremote)
250  elif self.tlstls is True:
251  try:
252  assert have_ssl
253  self.socketsocket = ssl.wrap_socket(self.socketsocket, ssl_version = ssl.PROTOCOL_TLSv1)
254  except ssl.SSLError as e:
255  raise TypeError('Socket does not seem to support TLS: ' + str(e))
256  else:
257  self.tlstls = False
258  # }}}
259 
262  def disconnect_cb(self, disconnect_cb):
263  self._disconnect_cb_disconnect_cb = disconnect_cb
264  # }}}
265 
267  def close(self):
268  if not self.socketsocket:
269  return b''
270  data = self.unreadunread()
271  self.socketsocket.close()
272  self.socketsocket = None
273  if self.connectionsconnections is not None:
274  self.connectionsconnections.remove(self)
275  if self._disconnect_cb_disconnect_cb:
276  return self._disconnect_cb_disconnect_cb(self, data) or b''
277  return data
278  # }}}
279 
284  def send(self, data):
285  if self.socketsocket is None:
286  return
287  #print 'sending %s' % repr(data)
288  try:
289  self.socketsocket.sendall(data)
290  except BrokenPipeError:
291  self.closeclose()
292  # }}}
293 
299  def sendline(self, data):
300  if self.socketsocket is None:
301  return
302  #print 'sending %s' % repr(data)
303  self.socketsocket.sendall((data + '\n').encode('utf-8'))
304  # }}}
305 
317  def recv(self, maxsize = 4096):
318  if self.socketsocket is None:
319  log('recv on closed socket')
320  raise EOFError('recv on closed socket')
321  ret = b''
322  try:
323  ret = self.socketsocket.recv(maxsize)
324  if hasattr(self.socketsocket, 'pending'):
325  while self.socketsocket.pending():
326  ret += self.socketsocket.recv(maxsize)
327  except:
328  log('Error reading from socket: %s' % sys.exc_info()[1])
329  self.closeclose()
330  return ret
331  if len(ret) == 0:
332  ret = self.closeclose()
333  if not self._disconnect_cb_disconnect_cb:
334  raise EOFError('network connection closed')
335  return ret
336  # }}}
337 
345  def rawread(self, callback, error = None):
346  if self.socketsocket is None:
347  return b''
348  ret = self.unreadunread()
349  self._callback_callback = (callback, None)
350  self._event_event = add_read(self.socketsocket, callback, error)
351  return ret
352  # }}}
353 
364  def read(self, callback, error = None, maxsize = 4096):
365  if self.socketsocket is None:
366  return b''
367  first = self.unreadunread()
368  self._maxsize_maxsize = maxsize
369  self._callback_callback = (callback, False)
370  def cb():
371  data = self.recvrecv(self._maxsize_maxsize)
372  #log('network read %d bytes' % len(data))
373  if not self._event_event:
374  return False
375  callback(data)
376  return True
377  self._event_event = add_read(self.socketsocket, cb, error)
378  if first:
379  callback(first)
380  # }}}
381 
393  def readlines(self, callback, error = None, maxsize = 4096):
394  if self.socketsocket is None:
395  return
396  self._linebuffer_linebuffer = self.unreadunread()
397  self._maxsize_maxsize = maxsize
398  self._callback_callback = (callback, True)
399  self._event_event = add_read(self.socketsocket, self._line_cb_line_cb, error)
400  # }}}
401  def _line_cb(self):
402  self._linebuffer_linebuffer += self.recvrecv(self._maxsize_maxsize)
403  while b'\n' in self._linebuffer_linebuffer and self._event_event:
404  assert self._callback_callback[1] is not None # Going directly from readlines() to rawread() is not allowed.
405  if self._callback_callback[1]:
406  line, self._linebuffer_linebuffer = self._linebuffer_linebuffer.split(b'\n', 1)
407  line = makestr(line)
408  self._callback_callback[0] (line)
409  else:
410  data = makestr(self._linebuffer_linebuffer)
411  self._linebuffer_linebuffer = b''
412  self._callback_callback[0](data)
413  return True
414  # }}}
415 
420  def unread(self):
421  if self._event_event:
422  try:
423  remove_read(self._event_event)
424  except ValueError:
425  # The function already returned False.
426  pass
427  self._event_event = None
428  ret = self._linebuffer_linebuffer
429  self._linebuffer_linebuffer = b''
430  return ret
431  # }}}
432 # }}}
433 
434 
435 class Server:
436 
465  def __init__(self, port, obj, address = '', backlog = 5, tls = False, disconnect_cb = None):
466  self._obj_obj = obj
467 
468  self.portport = ''
469 
470  self.ipv6ipv6 = False
471  self._socket_socket = None
472 
473  self.tlstls = tls
474 
475  self.connectionsconnections = set()
476 
477  self.disconnect_cbdisconnect_cb = disconnect_cb
478  if isinstance(port, str) and '/' in port:
479  # Unix socket.
480  # TLS is ignored for these sockets.
481  self.tlstls = False
482  self._socket_socket = socket.socket(socket.AF_UNIX)
483  self._socket_socket.bind(port)
484  self.portport = port
485  self._socket_socket.listen(backlog)
486  else:
487  self._tls_init_tls_init()
488  port = lookup(port)
489  self._socket_socket = socket.socket()
490  self._socket_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
491  self._socket_socket.bind((address, port))
492  self._socket_socket.listen(backlog)
493  if address == '':
494  self._socket6_socket6 = socket.socket(socket.AF_INET6)
495  self._socket6_socket6.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
496  self._socket6_socket6.bind(('::1', port))
497  self._socket6_socket6.listen(backlog)
498  self.ipv6ipv6 = True
499  self.portport = port
500  self._event_event = add_read(self._socket_socket, lambda: self._cb_cb(False), lambda: self._cb_cb(False))
501  if self.ipv6ipv6:
502  self._event_event = add_read(self._socket6_socket6, lambda: self._cb_cb(True), lambda: self._cb_cb(True))
503  def _cb(self, is_ipv6):
504  if is_ipv6:
505  new_socket = self._socket6_socket6.accept()
506  else:
507  new_socket = self._socket_socket.accept()
508  #log('Accepted connection from %s; possibly attempting to set up encryption' % repr(new_socket))
509  if self.tlstls:
510  assert have_ssl
511  try:
512  new_socket = (ssl.wrap_socket(new_socket[0], ssl_version = ssl.PROTOCOL_TLSv1, server_side = True, certfile = self._tls_cert_tls_cert, keyfile = self._tls_key_tls_key), new_socket[1])
513  except ssl.SSLError as e:
514  log('Rejecting (non-TLS?) connection for %s: %s' % (repr(new_socket[1]), str(e)))
515  try:
516  new_socket[0].shutdown(socket.SHUT_RDWR)
517  except:
518  # Ignore errors here.
519  pass
520  return True
521  except socket.error as e:
522  log('Rejecting connection for %s: %s' % (repr(new_socket[1]), str(e)))
523  try:
524  new_socket[0].shutdown(socket.SHUT_RDWR)
525  except:
526  # Don't care about errors on shutdown.
527  pass
528  return True
529  #log('Accepted TLS connection from %s' % repr(new_socket[1]))
530  s = Socket(new_socket[0], remote = new_socket[1], disconnect_cb = self.disconnect_cbdisconnect_cb, connections = self.connectionsconnections)
531  self._obj_obj(s)
532  return True
533 
536  def close(self):
537  self._socket_socket.close()
538  self._socket_socket = None
539  if self.ipv6ipv6:
540  self._socket6_socket6.close()
541  self._socket6_socket6 = None
542  if isinstance(self.portport, str) and '/' in self.portport:
543  os.remove(self.portport)
544  self.portport = ''
545 
548  def __del__(self):
549  if self._socket_socket is not None:
550  self.closeclose()
551  def _tls_init(self):
552  # Set up members for using tls, if requested.
553  if self.tlstls in (False, '-'):
554  self.tlstls = False
555  return
556  if self.tlstls in (None, True, ''):
557  self.tlstls = fhs.module_get_config('network')['tls']
558  if self.tlstls == '':
559  self.tlstls = socket.getfqdn()
560  elif self.tlstls == '-':
561  self.tlstls = False
562  return
563  # Use tls.
564  fc = fhs.read_data(os.path.join('certs', self.tlstls + os.extsep + 'pem'), opened = False, packagename = 'network')
565  fk = fhs.read_data(os.path.join('private', self.tlstls + os.extsep + 'key'), opened = False, packagename = 'network')
566  if fc is None or fk is None:
567  # Create new self-signed certificate.
568  certfile = fhs.write_data(os.path.join('certs', self.tlstls + os.extsep + 'pem'), opened = False, packagename = 'network')
569  csrfile = fhs.write_data(os.path.join('csr', self.tlstls + os.extsep + 'csr'), opened = False, packagename = 'network')
570  for p in (certfile, csrfile):
571  path = os.path.dirname(p)
572  if not os.path.exists(path):
573  os.makedirs(path)
574  keyfile = fhs.write_data(os.path.join('private', self.tlstls + os.extsep + 'key'), opened = False, packagename = 'network')
575  path = os.path.dirname(keyfile)
576  if not os.path.exists(path):
577  os.makedirs(path, 0o700)
578  os.system('openssl req -x509 -nodes -days 3650 -newkey rsa:4096 -subj "/CN=%s" -keyout "%s" -out "%s"' % (self.tlstls, keyfile, certfile))
579  os.system('openssl req -subj "/CN=%s" -new -key "%s" -out "%s"' % (self.tlstls, keyfile, csrfile))
580  fc = fhs.read_data(os.path.join('certs', self.tlstls + os.extsep + 'pem'), opened = False, packagename = 'network')
581  fk = fhs.read_data(os.path.join('private', self.tlstls + os.extsep + 'key'), opened = False, packagename = 'network')
582  self._tls_cert_tls_cert = fc
583  self._tls_key_tls_key = fk
584  #print(fc, fk)
585 # }}}
586 
587 
590 _timeouts = []
591 
594 _abort = False
595 def _handle_timeouts():
596  now = time.time()
597  while not _abort and len(_timeouts) > 0 and _timeouts[0][0] <= now:
598  _timeouts.pop(0)[1]()
599  if len(_timeouts) == 0:
600  return float('inf')
601  return _timeouts[0][0] - now
602 # }}}
603 
604 
607 _fds = [[], []]
608 
610 def iteration(block = False):
611  # The documentation says timeout should be omitted, it doesn't mention making it None.
612  t = _handle_timeouts()
613  if not block:
614  t = 0
615  #log('do select with timeout %f' % t)
616  if math.isinf(t):
617  ret = select.select(_fds[0], _fds[1], _fds[0] + _fds[1])
618  else:
619  ret = select.select(_fds[0], _fds[1], _fds[0] + _fds[1], t)
620  #log('select returned %s' % repr(ret))
621  for f in ret[2]:
622  if f not in _fds[0] and f not in _fds[1]:
623  continue
624  if not f.error():
625  try:
626  remove_read(f)
627  except ValueError:
628  # The connection was already closed.
629  pass
630  if _abort:
631  return
632  for f in ret[0]:
633  if f not in _fds[0]:
634  continue
635  if not f.handle():
636  try:
637  remove_read(f)
638  except ValueError:
639  # The connection was already closed.
640  pass
641  if _abort:
642  return
643  for f in ret[1]:
644  if f not in _fds[1]:
645  continue
646  if not f.handle():
647  remove_write(f)
648  if _abort:
649  return
650  _handle_timeouts()
651 # }}}
652 
653 
656 _running = False
657 
660 _idle = []
661 
665 def fgloop():
666  global _running
667  assert not _running
668 
671  _running = True
672  try:
673  while _running:
674  iteration(len(_idle) == 0)
675  if not _running:
676  return False
677  for i in _idle[:]:
678  if not i():
679  remove_idle(i)
680  if not _running:
681  break
682  finally:
683 
686  _abort = False
687  return False
688 # }}}
689 
690 
695 def bgloop():
696  assert _running == False
697  if os.getenv('NETWORK_NO_FORK') is None:
698  if os.fork() != 0:
699  sys.exit(0)
700  else:
701  log('Not backgrounding because NETWORK_NO_FORK is set\n')
702  fgloop()
703 # }}}
704 
705 
708 def endloop(force = False):
709  global _running, _abort
710  assert _running
711 
714  _running = False
715  if force:
716 
719  _abort = True
720 # }}}
721 
722 class _fd_wrap:
723  def __init__(self, fd, cb, error):
724  self.fd = fd
725  self.handle = cb
726  if error is not None:
727  self.error = error
728  else:
729  self.error = self.default_error
730  def fileno(self):
731  if isinstance(self.fd, int):
732  return self.fd
733  else:
734  return self.fd.fileno()
735  def default_error(self):
736  try:
737  remove_read(self)
738  log('Error returned from select; removed fd from read list')
739  except:
740  try:
741  remove_write(self)
742  log('Error returned from select; removed fd from write list')
743  except:
744  log('Error returned from select, but fd was not in read or write list')
745 # }}}
746 
747 def add_read(fd, cb, error = None):
748  _fds[0].append(_fd_wrap(fd, cb, error))
749  #log('add read %s' % repr(_fds[0][-1]))
750  return _fds[0][-1]
751 # }}}
752 
753 def add_write(fd, cb, error = None):
754  _fds[1].append(_fd_wrap(fd, cb, error))
755  return _fds[1][-1]
756 # }}}
757 
758 def add_timeout(abstime, cb):
759  _timeouts.append([abstime, cb])
760  ret = _timeouts[-1]
761  _timeouts.sort()
762  return ret
763 # }}}
764 
765 def add_idle(cb):
766  _idle.append(cb)
767  return _idle[-1]
768 # }}}
769 
770 def remove_read(handle):
771  #log('remove read %s' % repr(handle))
772  #traceback.print_stack()
773  _fds[0].remove(handle)
774 # }}}
775 
776 def remove_write(handle):
777  _fds[1].remove(handle)
778 # }}}
779 
780 def remove_timeout(handle):
781  _timeouts.remove(handle)
782 # }}}
783 
784 def remove_idle(handle):
785  _idle.remove(handle)
786 # }}}
tls
False or the hostname for which the TLS keys are used.
Definition: network.py:473
def close(self)
Stop the server.
Definition: network.py:536
ipv6
Whether the server listens for IPv6.
Definition: network.py:470
port
Port that is listened on.
Definition: network.py:468
disconnect_cb
Disconnect handler, to be used for new sockets.
Definition: network.py:477
connections
Currently active connections for this server.
Definition: network.py:475
Listen on a network port and accept connections.
Definition: network.py:435
def recv(self, maxsize=4096)
Read data from the network.
Definition: network.py:317
connections
connections set where this socket is registered.
Definition: network.py:210
def disconnect_cb(self, disconnect_cb)
Change the callback for disconnect notification.
Definition: network.py:262
def read(self, callback, error=None, maxsize=4096)
Register function to be called when data is received.
Definition: network.py:364
def readlines(self, callback, error=None, maxsize=4096)
Buffer incoming data until a line is received, then call a function.
Definition: network.py:393
def send(self, data)
Send data over the network.
Definition: network.py:284
def close(self)
Close the network connection.
Definition: network.py:267
remote
remote end of the network connection.
Definition: network.py:208
def sendline(self, data)
Send a line of text.
Definition: network.py:299
tls
read only variable which indicates whether TLS encryption is used on this socket.
Definition: network.py:206
def rawread(self, callback, error=None)
Register function to be called when data is ready for reading.
Definition: network.py:345
socket
underlying socket object.
Definition: network.py:214
def unread(self)
Cancel a read() or rawread() callback.
Definition: network.py:420
Connection object.
Definition: network.py:187
def add_timeout(abstime, cb)
Definition: network.py:758
def bgloop()
Like fgloop, but forks to the background.
Definition: network.py:695
makestr
Definition: network.py:72
def add_write(fd, cb, error=None)
Definition: network.py:753
def lookup(service)
Convert int or str with int or service to int port.
Definition: network.py:123
def add_read(fd, cb, error=None)
Definition: network.py:747
def remove_write(handle)
Definition: network.py:776
def remove_idle(handle)
Definition: network.py:784
def iteration(block=False)
Do a single iteration of the main loop.
Definition: network.py:610
def log(*message, filename=None, line=None, funcname=None, depth=0)
Log a message.
Definition: network.py:102
def wrap(i, o=None)
Wrap two files into a fake socket.
Definition: network.py:181
def fgloop()
Wait for events and handle them.
Definition: network.py:665
def set_log_output(file)
Change target for log().
Definition: network.py:85
def add_idle(cb)
Definition: network.py:765
def endloop(force=False)
Stop a loop that was started with fgloop() or bgloop().
Definition: network.py:708
def remove_timeout(handle)
Definition: network.py:780
def remove_read(handle)
Definition: network.py:770