general purpose bidirectional packet stream connection
Project description
A general purpose bidirectional packet stream connection.
Latest release 20240630:
- New ERQ_Packet to indicate end of client requests.
- PacketConnection.startup_shutdown: send ERQ_Packet at stream shutdown.
- New PacketConnection.end_requests() method to queue an ERQ_Packet to the send queue.
- PacketConnection: send the EOF_Packet from the send worker instead of from startup_shutdown.
- Rename PacketConnection.do to PacketConnection.call.
- PacketStream: BREAKING: replace the recv and send parameters with a singe recv_send parameter.
- PacketStream: do not close supplied connection handles - this allows reuse of an underlying binary connection.
- Many many logic fixes for clean and orderly shutdown.
- Rename PacketConnection.request to PacketConnection.submit.
- New BaseRequest and HasPacketConnection helper classes to make it easy to use a PacketConnection for a protocol.
Class BaseRequest(cs.binary.AbstractBinary)
A base class for request classes to use with HasPacketConnection.
This is a mixin aimed at *Binary classes representing the
request payload and supplies an __init__ method which saves
the optional flags parameter as .flags and passes all
other parameters to the superclass' __init__
(the *Binary superclass).
As such, it is important to define the subclass like this:
class AddRequest(
BaseRequest,
BinaryMultiValue('AddRequest', dict(hashenum=BSUInt, data=BSData)),
):
with BaseRequest first if you wish to omit an __init__ method.
Subclasses must implement the fulfil method to perform the
request operation.
Often a subclass will also implement the
decode_response_payload(flags,payload) method.
This provides the result of a request, returned by
HasPacketConnection.conn_do_remote or by the Result
returned from HasPacketConnection.conn_submit.
The default returns the response flags and payload directly.
This base class subclasses AbstractBinary to encode and
decode the request payload and has an additions flags
attribute for the Packet.flags. As such, subclasses have
two main routes for implemetation:
1: Subclass an existing AbstractBinary subclass. For example,
the cs.vt.stream.ContainsRequest looks up a hash code, and
subclasses the cs.vt.hash.HashField class.
2: Provide a parse(bfr) factory method and transcribe()
method to parse and transcribe the request payload like any
other AbstractBinary subclass.
Approach 1 does not necessarily need a distinct class;
a binary class can often be constructed in the class header.
For example, the cs.vt.stream.AddRequest payload is an int
representing the hash class and the data to add. The class
header looks like this:
class AddRequest(
BaseRequest,
BinaryMultiValue('AddRequest', dict(hashenum=BSUInt, data=BSData)),
):
much as one might subclass a namedtuple in other circumstances.
Method BaseRequest.decode_response_payload(self, flags: int, payload: bytes):
Decode a response flags and payload.
This default implementation returns the flags and payload unchanged.
Method BaseRequest.from_request_payload(flags: int, payload: bytes) -> 'BaseRequest':
Decode a request flags and payload, return a BaseRequest instance.
This is called with the correct BaseRequest subclass
derived from the received Packet.rq_type.
It decodes the
This default implementation assumes that flags==0
and calls cls.from_bytes(payload).
Method BaseRequest.fulfil(self, context) -> Union[NoneType, int, bytes, str, Tuple[int, bytes]]:
Fulfil this request at the receiving end of the connection using
context, some outer object using the connection.
Raise an exception if the request cannot be fulfilled.
Return values suitable for the response:
None: equivalent to(0,b'')int: returned in the flags withb''for the payloadbytes: returned as the payload with0as the flagsstr: returnencode(s,'ascii')as the payload with0as the flags(int,bytes): the flags and payload
A typical implementation looks like this:
def fulfil(self, context):
return context.come_method(params...)
where params come from the request attributes.
Class HasPacketConnection
This is a mixin class to aid writing classes which use a
PacketConnection to communicate with some service.
The supported request/response packet types are provided as
a mapping of int Packet.rq_type values to a class
implementing that request type, a subclass of BaseRequest.
For example, a cs.vt.stream.StreamStore subclasses HasPacketConnection
and initialises the mixin with this call:
HasPacketConnection.__init__(
self,
recv_send,
name,
{ 0: AddRequest, # add chunk, return hashcode
1: GetRequest, # get chunk from hashcode
.......
},
)
See the BaseRequest class for details on how to implement
each request type.
Method HasPacketConnection.__init__(self, recv_send: Union[Tuple[Union[int, cs.packetstream.ReadableFile, cs.buffer.CornuCopyBuffer], Union[int, cs.packetstream.SendableFile]], Callable[[], Tuple[Union[int, cs.packetstream.ReadableFile, cs.buffer.CornuCopyBuffer], Union[int, cs.packetstream.SendableFile], Callable[[], NoneType]]]], name: str = None, *, rq_type_map: Mapping[int, cs.packetstream.BaseRequest], **packet_kw):
Initialise self.conn as a PacketConnection.
Parameters:
recv_send: as forPacketConnectionname: an optional name for the connectionrq_type_map: a mapping of request types toBaseRequestsubclasses
Other keyword arguments are passed to PacketConnection().
Method HasPacketConnection.conn_do_remote(self, rq: cs.packetstream.BaseRequest, **submit_kw):
Run rq remotely.
Raises ValueError if the response is not ok.
Otherwise returns rq.decode_response(flags, payload).
Method HasPacketConnection.conn_handle_request(self, rq_type: int, flags: int, payload: bytes):
Handle receipt of a request packet.
Decode the packet into a request rq and return rq.fulfil(self).
Method HasPacketConnection.conn_submit(self, rq: cs.packetstream.BaseRequest, *, channel=0, label=None) -> cs.result.Result:
Submit this request to the connection, return a Result.
Class Packet(cs.binary.SimpleBinary)
A protocol packet.
Method Packet.__str__(self):
pylint: disable=signature-differs
Method Packet.parse(bfr, log=None):
Parse a Packet from a buffer.
Method Packet.transcribe(self):
Transcribe this packet.
Method Packet.write(self, file, flush=False, log=None):
Write the Packet to file.
Class PacketConnection(cs.resources.MultiOpenMixin)
A bidirectional binary connection for exchanging requests and responses.
Method PacketConnection.__init__(self, recv_send: Union[Tuple[Union[int, cs.packetstream.ReadableFile, cs.buffer.CornuCopyBuffer], Union[int, cs.packetstream.SendableFile]], Callable[[], Tuple[Union[int, cs.packetstream.ReadableFile, cs.buffer.CornuCopyBuffer], Union[int, cs.packetstream.SendableFile], Callable[[], NoneType]]]], name=None, *, request_handler=None, packet_grace=None, trace_log: Optional[Callable] = None):
Initialise the PacketConnection.
Parameters:
recv_send: specify the receive and send streamspacket_grace: default pause in the packet sending worker to allow another packet to be queued before flushing the output stream. Default:DEFAULT_PACKET_GRACEs. A value of0will flush immediately if the queue is empty.request_handler: an optional callable accepting (rq_type,flags,payload). The request_handler may return one of 5 values on success:None: response will be 0 flags and an empty payload.int: flags only. Response will be the flags and an empty payload.bytes: payload only. Response will be 0 flags and the payload.str: payload only. Response will be 0 flags and the str encoded as bytes using UTF-8.(int, bytes): Specify flags and payload for response. An unsuccessful request should raise an exception, which will cause a failure response packet.
The recv_send parameter is used to prepare the connection.
It may take the following forms:
- a 2-tuple of
(recv,send)specifying the receive and send streams - an
intspecifying a single file descriptor used for both receive and send - a callable returning a 3-tuple of
(recv,send,close)as forPacketConnection's callable mode The(recv,send)pair indicate the inbound and outbound binary streams.
For preexisting streams such as pipes or sockets these can be:
recv: anything acceptable toCornuCopyBuffer.promote(), typically a file descriptor or a binary file with.writeand.flushmethods.send: a file descriptor or a binary file with.writeand.flushmethods.
For "on demand" use, recv may be a callable and send may be None.
In this case, recv() must return a 3-tuple of
(recv,send,shutdown) being values for recv and send
as above, and a shutdown function to do the necessary "close"
of the new recv and send. shutdown may be None if there is
no meaningful close operation.
The PacketConnection's startup_shutdown method will
call recv() to obtain the binary streams and call the
shutdown on completion.
This supports use for on demand connections, eg:
P = PacketConnection(connect_to_server)
......
with P:
... use P to to work ...
where connect_to_server() might connect to some remote service.
Method PacketConnection.__call__(self, rq_type, payload=b'', flags=0, *, decode_response=None, channel=0, label=None):
Calling the PacketConnection performs a synchronous request.
Submits the request, then calls the Result returned from the request.
Method PacketConnection.join(self):
Wait for the send and receive workers to terminate.
Method PacketConnection.join_recv(self):
Wait for the end of the receive worker.
Servers should call this.
Method PacketConnection.send_eof(self):
Queue the magic EOF Packet.
Method PacketConnection.send_erq(self):
Queue the magic end-of-requests Packet.
Method PacketConnection.submit(self, rq_type: int, flags: int = 0, payload: bytes = b'', *, decode_response=None, channel=0, label=None) -> cs.result.Result:
Compose and dispatch a new request, returns a Result.
Allocates a new tag, a Result to deliver the response, and
records the response decode function for use when the
response arrives.
Parameters:
rq_type: request type code, anintflags: optional flags to accompany the request, an int; default0.payload: optional bytes-like object to accompany the request; defaultb''decode_response: optional callable accepting (response_flags, response_payload_bytes) and returning the decoded response payload value; if unspecified, the response payload bytes are usedlabel: optional label for this request to aid debugging
The Result will yield an (ok, flags, payload) tuple, where:
ok: whether the request was successfulflags: the response flagspayload: the response payload, decoded by decode_response if specified
Class ReadableFile(typing.Protocol)
The requirements for a file used to receive.
Method ReadableFile.__subclasshook__(other):
Set (or override) the protocol subclass hook.
Method ReadableFile.read(self, size: int) -> bytes:
Read up to size bytes.
Class RequestState(RequestState)
A state object tracking a particular request.
Method RequestState.cancel(self):
Cancel this request.
Method RequestState.complete(self, flags, payload):
Complete the request from an "ok" flags and payload.
Method RequestState.fail(self, flags, payload):
Fail the request from a "not ok" flags and payload.
Class SendableFile(typing.Protocol)
The requirements for a file used to send.
Method SendableFile.__subclasshook__(other):
Set (or override) the protocol subclass hook.
Method SendableFile.flush(self) -> None:
Flush any buffer of written bytes.
Method SendableFile.write(self, bs: bytes) -> int:
Write bytes, return the number of bytes written.
Release Log
Release 20240630:
- New ERQ_Packet to indicate end of client requests.
- PacketConnection.startup_shutdown: send ERQ_Packet at stream shutdown.
- New PacketConnection.end_requests() method to queue an ERQ_Packet to the send queue.
- PacketConnection: send the EOF_Packet from the send worker instead of from startup_shutdown.
- Rename PacketConnection.do to PacketConnection.call.
- PacketStream: BREAKING: replace the recv and send parameters with a singe recv_send parameter.
- PacketStream: do not close supplied connection handles - this allows reuse of an underlying binary connection.
- Many many logic fixes for clean and orderly shutdown.
- Rename PacketConnection.request to PacketConnection.submit.
- New BaseRequest and HasPacketConnection helper classes to make it easy to use a PacketConnection for a protocol.
Release 20240412:
- PacketConnection: now subclasses MultiOpenMixin, big refactor.
- PacketConnection.init: use @promote to turn the recv parameter into a CornuCopyBuffer.
- Fix a deadlock.
Release 20211208:
- Packet.eq: only test .rq_type if .is_request.
- Update tests for changes.
Release 20210306:
- Port to new cs.binary.Binary* classes.
- Some refactors and small fixes.
Release 20191004:
- PacketConnection: new optional parameter
packet_graceto tune the send delay for additional packets before a flush, default DEFAULT_PACKET_GRACE (0.01s), 0 for no delay. - Add a crude packet level activity ticker.
Release 20190221: DISTINFO requirement updates.
Release 20181228: Initial PyPI release.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cs.packetstream-20240630.tar.gz.
File metadata
- Download URL: cs.packetstream-20240630.tar.gz
- Upload date:
- Size: 19.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a95886bc3d1e28ddd58c9ff62429dfc54dc73dc7eb87a7b60cb3745cadb3566c
|
|
| MD5 |
87c20ffb7f9adb289faba93c73bcaff8
|
|
| BLAKE2b-256 |
31da6252b713fa3efa66539e259d7e9df3a886c21bd10709808d14736dbca236
|
File details
Details for the file cs.packetstream-20240630-py3-none-any.whl.
File metadata
- Download URL: cs.packetstream-20240630-py3-none-any.whl
- Upload date:
- Size: 17.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e3ffdc59a038299379af673b66ea81a9acfc9fbc12735b8c4460afd430e41c2c
|
|
| MD5 |
ea1d37caf19e02491947a5f8882ea2a2
|
|
| BLAKE2b-256 |
1a850a4cc48ea1b718a93c4e9360204101f4d84161f32af77a8118db07990ee3
|