Source code for photonpump.messages

import datetime
import json
import math
import struct
from collections import namedtuple
from enum import IntEnum
from typing import Any, Dict, Sequence
from uuid import UUID, uuid4

from . import messages_pb2

HEADER_LENGTH = 1 + 1 + 16
SIZE_UINT_32 = 4
_LENGTH = struct.Struct("<I")
_HEAD = struct.Struct("<BBIIII")

ROUND_ROBIN = "RoundRobin"
DISPATCH_TO_SINGLE = "DisptchToSingle"
PINNED = "Pinned"


def make_enum(descriptor):
    vals = [(x.name, x.number) for x in descriptor.values]

    return IntEnum(descriptor.name, vals)


[docs]class TcpCommand(IntEnum): HeartbeatRequest = 0x01 HeartbeatResponse = 0x02 Ping = 0x03 Pong = 0x04 WriteEvents = 0x82 WriteEventsCompleted = 0x83 Read = 0xB0 ReadEventCompleted = 0xB1 ReadStreamEventsForward = 0xB2 ReadStreamEventsForwardCompleted = 0xB3 ReadStreamEventsBackward = 0xB4 ReadStreamEventsBackwardCompleted = 0xB5 ReadAllEventsForward = 0xB6 ReadAllEventsForwardCompleted = 0xB7 ReadAllEventsBackward = 0xB8 ReadAllEventsBackwardCompleted = 0xB9 SubscribeToStream = 0xC0 SubscriptionConfirmation = 0xC1 StreamEventAppeared = 0xC2 UnsubscribeFromStream = 0xC3 SubscriptionDropped = 0xC4 ConnectToPersistentSubscription = 0xC5 PersistentSubscriptionConfirmation = 0xC6 PersistentSubscriptionStreamEventAppeared = 0xC7 CreatePersistentSubscription = 0xC8 CreatePersistentSubscriptionCompleted = 0xC9 DeletePersistentSubscription = 0xCA DeletePersistentSubscriptionCompleted = 0xCB PersistentSubscriptionAckEvents = 0xCC PersistentSubscriptionNakEvents = 0xCD UpdatePersistentSubscription = 0xCE UpdatePersistentSubscriptionCompleted = 0xCF BadRequest = 0xF0 NotHandled = 0xF1 Authenticate = 0xF2 Authenticated = 0xF3 NotAuthenticated = 0xF4 IdentifyClient = 0xF5 ClientIdentified = 0xF6
[docs]class StreamDirection(IntEnum): Forward = 0 Backward = 1
[docs]class ContentType(IntEnum): Json = 0x01 Binary = 0x00
[docs]class OperationFlags(IntEnum): Empty = 0x00 Authenticated = 0x01
OperationResult = make_enum(messages_pb2._OPERATIONRESULT) NotHandledReason = make_enum(messages_pb2._NOTHANDLED_NOTHANDLEDREASON) SubscriptionDropReason = make_enum( messages_pb2._SUBSCRIPTIONDROPPED_SUBSCRIPTIONDROPREASON ) class Credential: def __init__(self, username, password): self.username = username self.password = password username_bytes = username.encode("UTF-8") password_bytes = password.encode("UTF-8") self.length = 2 + len(username_bytes) + len(password_bytes) self.bytes = bytearray() self.bytes.extend(len(username_bytes).to_bytes(1, byteorder="big")) self.bytes.extend(username_bytes) self.bytes.extend(len(password_bytes).to_bytes(1, byteorder="big")) self.bytes.extend(password_bytes) @classmethod def from_bytes(cls, data): """ I am so sorry. """ len_username = int.from_bytes(data[0:2], byteorder="big") offset_username = 2 + len_username username = data[2:offset_username].decode("UTF-8") offset_password = 2 + offset_username len_password = int.from_bytes( data[offset_username:offset_password], byteorder="big" ) pass_begin = offset_password pass_end = offset_password + len_password password = data[pass_begin:pass_end].decode("UTF-8") return cls(username, password) class InboundMessage: def __init__( self, conversation_id: UUID, command: TcpCommand, payload: bytes = None ) -> None: self.conversation_id = conversation_id self.command = command self.payload = payload or b"" self.data_length = len(payload) self.length = HEADER_LENGTH + self.data_length @property def header_bytes(self): data = bytearray(SIZE_UINT_32 + 2) struct.pack_into("<IBB", data, 0, self.length, self.command, 0) data.extend(self.conversation_id.bytes_le) return data def __repr__(self): return self.__str__() + "\n" + dump(self.header_bytes, self.payload) def __str__(self): return "%s (%s) of %s flags=%d" % ( TcpCommand(self.command).name, self.conversation_id, sizeof_fmt(self.length), 0, ) class OutboundMessage: def __init__( self, conversation_id: UUID, command: TcpCommand, payload: Any, creds: Credential = None, one_way: bool = False, require_master=False, ) -> None: self.conversation_id = conversation_id self.command = command self.payload = payload self.creds = creds self.require_master = require_master self.data_length = len(payload) self.length = HEADER_LENGTH + self.data_length if self.creds: self.length += creds.length self.one_way = one_way @property def header_bytes(self): data = bytearray(SIZE_UINT_32 + 2) struct.pack_into( "<IBB", data, 1 if self.require_master else 0, self.length, self.command.value, 1 if self.creds else 0, ) data.extend(self.conversation_id.bytes_le) if self.creds: data.extend(self.creds.bytes) return data def __repr__(self): return dump(self.header_bytes, self.payload) def __str__(self): return "%s (%s) of %s flags=%d" % ( TcpCommand(self.command).name, self.conversation_id, sizeof_fmt(self.length), 0, ) def __eq__(self, other): return isinstance(other, OutboundMessage) and repr(self) == repr(other)
[docs]class ExpectedVersion(IntEnum): """Static values for concurrency control Attributes: Any: No concurrency control. StreamMustNotExist: The request should fail if the stream already exists. StreamMustBeEmpty: The request should fail if the stream does not exist, or if the stream already contains events. StreamMustExist: The request should fail if the stream does not exist. """ Any = -2 StreamMustNotExist = -1 StreamMustBeEmpty = 0 StreamMustExist = -4
JsonDict = Dict[str, Any] Header = namedtuple( "photonpump_result_header", ["size", "cmd", "flags", "correlation_id", "username", "password"], ) def parse_header(length: bytearray, data: bytearray) -> Header: (size,) = _LENGTH.unpack(length) return Header(size, data[0], data[1], UUID(bytes_le=data[2:18]), None, None) def sizeof_fmt(num, suffix="B"): for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024.0: return "%3.1f%s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f%s%s" % (num, "Yi", suffix) def print_header(header): return "%s (%s) of %s flags=%d" % ( TcpCommand(header.cmd).name, header.correlation_id, sizeof_fmt(header.size), header.flags, ) Header.__repr__ = print_header Header.__new__.__defaults__ = (None, None) NewEventData = namedtuple("photonpump_event", ["id", "type", "data", "metadata"]) EventRecord = namedtuple( "photonpump_eventrecord", ["stream", "id", "event_number", "type", "data", "metadata", "created"], ) def _json(self): return json.loads(self.data.decode("UTF-8")) EventRecord.json = _json class Event: def __init__(self, event: EventRecord, link: EventRecord) -> None: self.event = event self.link = link self.stream = event.stream self.id = event.id self.event_number = event.event_number self.type = event.type self.data = event.data self.metadata = event.metadata self.created = event.created @property def original_event(self) -> EventRecord: return self.link or self.event @property def original_event_id(self) -> UUID: return self.original_event.id def json(self): return json.loads(self.data.decode("UTF-8")) def __repr__(self): return f"<Event: {self.event_number}@{self.stream}:{self.type}>"
[docs]class StreamSlice(list): def __init__( self, events: Sequence[Event], next_event_number: int, last_event_number: int, prepare_position: int = None, commit_position: int = None, is_end_of_stream: bool = False, ) -> None: super().__init__(events) self.next_event_number = next_event_number self.last_event_number = last_event_number self.prepare_position = prepare_position self.commit_position = commit_position self.is_end_of_stream = is_end_of_stream self.events = events
def dump(*chunks: bytearray): data = bytearray() for chunk in chunks: data.extend(chunk) length = len(data) rows = length / 16 dump = [] for i in range(0, math.ceil(rows)): offset = i * 16 row = data[offset : offset + 16] hex = "{0: <47}".format(" ".join("{:02x}".format(x) for x in row)) dump.append("%06d: %s | %a" % (offset, hex, bytes(row))) return "\n" + "\n".join(dump) def _make_event(record: messages_pb2.ResolvedEvent): link = ( EventRecord( record.link.event_stream_id, UUID(bytes_le=bytes(record.link.event_id)), record.link.event_number, record.link.event_type, record.link.data, record.link.metadata, datetime.datetime.fromtimestamp(record.link.created_epoch / 1e3), ) if record.HasField("link") else None ) if not record.HasField("event"): return Event(link, None) event = EventRecord( record.event.event_stream_id, UUID(bytes_le=bytes(record.event.event_id)), record.event.event_number, record.event.event_type, record.event.data, record.event.metadata, datetime.datetime.fromtimestamp(record.event.created_epoch / 1e3), ) return Event(event, link)
[docs]def NewEvent( type: str, id: UUID = None, data: JsonDict = None, metadata: JsonDict = None ) -> NewEventData: """Build the data structure for a new event. Args: type: An event type. id: The uuid identifier for the event. data: A dict containing data for the event. These data must be json serializable. metadata: A dict containing metadata about the event. These must be json serializable. """ return NewEventData(id or uuid4(), type, data, metadata)
ReadEventResult = make_enum(messages_pb2._READEVENTCOMPLETED_READEVENTRESULT) ReadStreamResult = make_enum(messages_pb2._READSTREAMEVENTSCOMPLETED_READSTREAMRESULT) SubscriptionResult = make_enum( messages_pb2._CREATEPERSISTENTSUBSCRIPTIONCOMPLETED_CREATEPERSISTENTSUBSCRIPTIONRESULT ) class SubscriptionCreatedResponse: def __init__(self, result: SubscriptionResult, reason: str) -> None: self.reason = reason self.result = result