Package ldaptor :: Package test :: Module util
[hide private]
[frames] | no frames]

Source Code for Module ldaptor.test.util

  1  from twisted.python import failure 
  2  from twisted.internet import reactor, protocol, address, error 
  3  from twisted.test import testutils 
  4  from twisted.trial import unittest 
  5   
  6  from StringIO import StringIO 
  7   
8 -class FakeTransport(protocol.FileWrapper):
9 disconnecting = False 10 disconnect_done = False
11 - def __init__(self, addr, peerAddr):
12 self.data = StringIO() 13 protocol.FileWrapper.__init__(self, self.data) 14 self.addr = addr 15 self.peerAddr = peerAddr
16 - def getHost(self):
17 return self.addr
18 - def getPeer(self):
19 return self.peerAddr
20 - def loseConnection(self):
21 self.disconnecting = True
22
23 -class FasterIOPump(testutils.IOPump):
24 - def pump(self):
25 """Move data back and forth. 26 27 Returns whether any data was moved. 28 """ 29 self.clientIO.seek(0) 30 self.serverIO.seek(0) 31 cData = self.clientIO.read() 32 sData = self.serverIO.read() 33 self.clientIO.seek(0) 34 self.serverIO.seek(0) 35 self.clientIO.truncate() 36 self.serverIO.truncate() 37 self.server.dataReceived(cData) 38 self.client.dataReceived(sData) 39 if cData or sData: 40 return 1 41 else: 42 return 0
43
44 -class IOPump(FasterIOPump):
45 active = []
46 - def __init__(self, 47 client, server, 48 clientTransport, serverTransport):
49 self.clientTransport = clientTransport 50 self.serverTransport = serverTransport 51 testutils.IOPump.__init__(self, 52 client=client, 53 server=server, 54 clientIO=clientTransport.data, 55 serverIO=serverTransport.data) 56 self.active.append(self)
57 - def pump(self):
58 FasterIOPump.pump(self) 59 if (self.clientTransport.disconnecting 60 and not self.clientTransport.data.getvalue() 61 and not self.clientTransport.disconnect_done): 62 self.server.connectionLost(error.ConnectionDone) 63 self.clientTransport.disconnect_done = True 64 65 if (self.serverTransport.disconnecting 66 and not self.serverTransport.data.getvalue() 67 and not self.serverTransport.disconnect_done): 68 self.client.connectionLost(error.ConnectionDone) 69 self.serverTransport.disconnect_done = True 70 71 if (self.clientTransport.disconnect_done 72 and self.serverTransport.disconnect_done): 73 self.active.remove(self)
74
75 - def __repr__(self):
76 return '<%s client=%r/%r server=%r/%r>' % ( 77 self.__class__.__name__, 78 self.client, 79 self.clientIO.getvalue(), 80 self.server, 81 self.serverIO.getvalue(), 82 83 )
84 -def returnConnected(server, client, 85 clientAddress=None, 86 serverAddress=None):
87 """Take two Protocol instances and connect them. 88 """ 89 if serverAddress is None: 90 serverAddress = address.IPv4Address('TCP', 'localhost', 1) 91 if clientAddress is None: 92 clientAddress = address.IPv4Address('TCP', 'localhost', 2) 93 clientTransport = FakeTransport(clientAddress, serverAddress) 94 client.makeConnection(clientTransport) 95 serverTransport = FakeTransport(serverAddress, clientAddress) 96 server.makeConnection(serverTransport) 97 pump = IOPump(client, server, 98 clientTransport, 99 serverTransport) 100 # Challenge-response authentication: 101 pump.flush() 102 # Uh... 103 pump.flush() 104 return pump
105
106 -def _append(result, lst):
107 lst.append(result)
108
109 -def _getDeferredResult(d, timeout=None):
110 if timeout is not None: 111 d.setTimeout(timeout) 112 resultSet = [] 113 d.addBoth(_append, resultSet) 114 while not resultSet: 115 for pump in IOPump.active: 116 pump.pump() 117 reactor.iterate() 118 return resultSet[0]
119
120 -def pumpingDeferredResult(d, timeout=None):
121 result = _getDeferredResult(d, timeout) 122 if isinstance(result, failure.Failure): 123 if result.tb: 124 raise result.value.__class__, result.value, result.tb 125 raise result.value 126 else: 127 return result
128 129
130 -def pumpingDeferredError(d, timeout=None):
131 result = _getDeferredResult(d, timeout) 132 if isinstance(result, failure.Failure): 133 return result 134 else: 135 raise unittest.FailTest, "Deferred did not fail: %r" % (result,)
136