import array
import asyncio
import enum
import logging
import struct
import uuid
from typing import Any, NamedTuple, Optional, Sequence
from . import conversations as convo
from . import messages as msg
from .discovery import DiscoveryRetryPolicy, NodeService, get_discoverer, select_random
HEADER_LENGTH = 1 + 1 + 16
SIZE_UINT_32 = 4
class Event(list):
def __call__(self, *args, **kwargs):
for f in self:
f(*args, **kwargs)
class ConnectorCommand(enum.IntEnum):
Connect = 0
HandleConnectFailure = 1
HandleConnectionOpened = 2
HandleConnectionClosed = 3
HandleConnectionFailed = 4
HandleHeartbeatFailed = 5
HandleHeartbeatSuccess = 6
HandleConnectorFailed = -2
Stop = -1
class ConnectorState(enum.IntEnum):
Begin = 0
Connecting = 1
Connected = 2
Stopping = 3
Stopped = 4
class ConnectorInstruction(NamedTuple):
command: ConnectorCommand
future: Optional[asyncio.Future]
data: Optional[Any]
class Connector:
def __init__(
self,
discovery,
dispatcher,
retry_policy=None,
ctrl_queue=None,
connect_timeout=5,
name=None,
loop=None,
):
self.name = name
self.connection_counter = 0
self.dispatcher = dispatcher
self.loop = loop or asyncio.get_event_loop()
self.discovery = discovery
self.connected = Event()
self.disconnected = Event()
self.stopped = Event()
self.ctrl_queue = ctrl_queue or asyncio.Queue(loop=self.loop)
self.log = logging.get_named_logger(Connector)
self._run_loop = asyncio.ensure_future(self._run())
self.heartbeat_failures = 0
self.connect_timeout = connect_timeout
self.active_protocol = None
self.retry_policy = retry_policy or DiscoveryRetryPolicy(retries_per_node=0)
def _put_msg(self, msg):
asyncio.ensure_future(self.ctrl_queue.put(msg))
def connection_made(self, address, protocol):
self._put_msg(
ConnectorInstruction(
ConnectorCommand.HandleConnectionOpened, None, (address, protocol)
)
)
def heartbeat_received(self, conversation_id):
self.retry_policy.record_success(self.target_node)
self._put_msg(
ConnectorInstruction(
ConnectorCommand.HandleHeartbeatSuccess, None, conversation_id
)
)
def connection_lost(self, exn=None):
self.log.info("connection_lost {}".format(exn))
self.retry_policy.record_failure(self.target_node)
if exn:
self._put_msg(
ConnectorInstruction(ConnectorCommand.HandleConnectionFailed, None, exn)
)
else:
self._put_msg(
ConnectorInstruction(
ConnectorCommand.HandleConnectionClosed, None, None
)
)
def heartbeat_failed(self, exn=None):
self._put_msg(
ConnectorInstruction(ConnectorCommand.HandleHeartbeatFailed, None, exn)
)
async def start(self, target: Optional[NodeService] = None):
self.state = ConnectorState.Connecting
await self.ctrl_queue.put(
ConnectorInstruction(ConnectorCommand.Connect, None, target)
)
async def stop(self, exn=None):
self.log.info("Stopping connector")
self.state = ConnectorState.Stopping
self.log.info("In ur stop stopping ur procool")
if self.active_protocol:
await self.active_protocol.stop()
self.active_protocol = None
self._run_loop.cancel()
self.stopped(exn)
async def reconnect(self):
if self.active_protocol:
await self.active_protocol.stop()
else:
await self.start()
async def _attempt_connect(self, node):
if not node:
try:
self.log.debug("Performing node discovery")
node = self.target_node = await self.discovery.discover()
except Exception as e:
await self.ctrl_queue.put(
ConnectorInstruction(
ConnectorCommand.HandleConnectorFailed, None, e
)
)
return
self.log.info("Connecting to %s:%s", node.address, node.port)
try:
self.connection_counter += 1
protocol = PhotonPumpProtocol(
node,
self.connection_counter,
self.dispatcher,
self,
self.loop,
self.name,
)
await asyncio.wait_for(
self.loop.create_connection(lambda: protocol, node.address, node.port),
self.connect_timeout,
)
except Exception as e:
await self.ctrl_queue.put(
ConnectorInstruction(ConnectorCommand.HandleConnectFailure, None, e)
)
async def _on_transport_received(self, address, protocol):
self.log.info(
"PhotonPump is connected to eventstore instance at %s via %s",
address,
protocol,
)
self.active_protocol = protocol
await self.dispatcher.write_to(protocol.output_queue)
self.connected(address)
async def _reconnect(self, node):
if not node:
await self.start()
return
if self.retry_policy.should_retry(node):
await self.retry_policy.wait(node)
await self.start(target=node)
else:
self.log.error("Reached maximum number of retry attempts on node %s", node)
self.discovery.mark_failed(node)
await self.start()
async def _on_transport_closed(self):
self.log.info("Connection closed gracefully, restarting")
self.disconnected()
await self._reconnect(self.target_node)
async def _on_transport_error(self, exn):
self.log.info("Connection closed with error, restarting %s", exn)
self.disconnected()
await self._reconnect(self.target_node)
async def _on_connect_failed(self, exn):
self.log.info(
"Failed to connect to host %s with error %s restarting",
self.target_node,
exn,
)
self.retry_policy.record_failure(self.target_node)
await self._reconnect(self.target_node)
async def _on_failed_heartbeat(self, exn):
self.log.warn("Failed to handle a heartbeat")
self.heartbeat_failures += 1
if self.heartbeat_failures >= 3:
await self.active_protocol.stop()
self.heartbeat_failures = 0
async def _on_successful_heartbeat(self, conversation_id):
self.log.debug("Received heartbeat from conversation %s", conversation_id)
self.heartbeat_failures = 0
async def _on_connector_failed(self, exn):
self.log.error("Connector failed to find a connection")
await self.stop(exn=exn)
async def _run(self):
while True:
self.log.debug("Waiting for next message")
msg = await self.ctrl_queue.get()
self.log.debug("Connector received message %s", msg)
if msg.command == ConnectorCommand.Connect:
await self._attempt_connect(msg.data)
if msg.command == ConnectorCommand.HandleConnectFailure:
await self._on_connect_failed(msg.data)
if msg.command == ConnectorCommand.HandleConnectionOpened:
await self._on_transport_received(*msg.data)
if msg.command == ConnectorCommand.HandleConnectionClosed:
await self._on_transport_closed()
if msg.command == ConnectorCommand.HandleConnectionFailed:
await self._on_transport_error(msg.data)
if msg.command == ConnectorCommand.HandleHeartbeatFailed:
await self._on_failed_heartbeat(msg.data)
if msg.command == ConnectorCommand.HandleHeartbeatSuccess:
await self._on_successful_heartbeat(msg.data)
if msg.command == ConnectorCommand.HandleConnectorFailed:
await self._on_connector_failed(msg.data)
if msg.command == ConnectorCommand.Stop:
raise NotImplementedError("WAT DO?")
class PaceMaker:
"""
Handles heartbeat requests and responses to keep the connection alive.
"""
def __init__(
self,
output_queue: asyncio.Queue,
connector: Connector,
response_timeout=10,
heartbeat_period=30,
heartbeat_id=None,
) -> None:
self._output = output_queue
self.heartbeat_id = heartbeat_id or uuid.uuid4()
self._connector = connector
self.response_timeout = response_timeout
self.heartbeat_period = heartbeat_period
self._fut = None
async def handle_request(self, message: msg.InboundMessage):
response = convo.Heartbeat(message.conversation_id)
await response.start(self._output)
return
async def handle_response(self, message: msg.InboundMessage):
if message.conversation_id == self.heartbeat_id:
if self._fut:
self._fut.set_result(message.conversation_id)
async def send_heartbeat(self) -> asyncio.Future:
fut = asyncio.Future()
logging.debug("Sending heartbeat %s to server", self.heartbeat_id)
hb = convo.Heartbeat(self.heartbeat_id, direction=convo.Heartbeat.OUTBOUND)
await hb.start(self._output)
self._fut = fut
return fut
async def await_heartbeat_response(self):
try:
await asyncio.wait_for(self._fut, self.response_timeout)
logging.debug("Received heartbeat response from server")
self._connector.heartbeat_received(self.heartbeat_id)
except asyncio.TimeoutError as e:
logging.warning("Heartbeat %s timed out", self.heartbeat_id)
self._connector.heartbeat_failed(e)
except asyncio.CancelledError:
logging.debug("Heartbeat waiter cancelled.")
raise
except Exception as exn:
logging.exception("Heartbeat %s failed", self.heartbeat_id)
self._connector.heartbeat_failed(exn)
async def send_heartbeats(self):
while True:
try:
await self.send_heartbeat()
await self.await_heartbeat_response()
await asyncio.sleep(self.heartbeat_period)
except asyncio.CancelledError:
logging.debug("Heartbeat loop cancelled")
break
class MessageWriter:
def __init__(
self,
writer: asyncio.StreamWriter,
connection_number: int,
output_queue: asyncio.Queue,
name=None,
loop=None,
):
self._logger = logging.get_named_logger(MessageWriter, name, connection_number)
self.writer = writer
self._queue = output_queue
async def enqueue_message(self, message: msg.OutboundMessage):
await self._queue.put(message)
async def start(self):
while True:
msg = await self._queue.get()
try:
self._logger.debug("Sending message %s", msg)
self._logger.trace("Message body is %r", msg)
self.writer.write(msg.header_bytes)
self.writer.write(msg.payload)
except Exception as e:
self._logger.error("Failed to send message %s", e, exc_info=True)
try:
await self.writer.drain()
self._logger.debug("Finished drain for %s", msg)
except Exception as e:
self._logger.error(e)
class MessageReader:
MESSAGE_MIN_SIZE = SIZE_UINT_32 + HEADER_LENGTH
HEAD_PACK = struct.Struct("<IBB")
def __init__(
self,
reader: asyncio.StreamReader,
connection_number: int,
queue,
pacemaker: PaceMaker,
name=None,
loop=None,
):
self._loop = loop or asyncio.get_event_loop()
self.header_bytes = array.array("B", [0] * (self.MESSAGE_MIN_SIZE))
self.header_bytes_required = self.MESSAGE_MIN_SIZE
self.queue = queue
self.length = 0
self.message_offset = 0
self.conversation_id = None
self.message_buffer = None
self._logger = logging.get_named_logger(MessageReader, name, connection_number)
self.reader = reader
self.pacemaker = pacemaker
self._trace_enabled = self._logger.getEffectiveLevel() <= logging.TRACE
def feed_data(self, data):
self.reader.feed_data(data)
async def start(self):
"""Loop forever reading messages and invoking
the operation that caused them"""
while True:
try:
data = await self.reader.read(8192)
if self._trace_enabled:
self._logger.trace(
"Received %d bytes from remote server:\n%s",
len(data),
msg.dump(data),
)
await self.process(data)
except asyncio.CancelledError:
return
except:
logging.exception("Unhandled error in Message Reader")
raise
async def process(self, chunk: bytes):
if chunk is None:
return
chunk_offset = 0
chunk_len = len(chunk)
while chunk_offset < chunk_len:
while self.header_bytes_required and chunk_offset < chunk_len:
offset = self.MESSAGE_MIN_SIZE - self.header_bytes_required
self.header_bytes[offset] = chunk[chunk_offset]
chunk_offset += 1
self.header_bytes_required -= 1
if not self.header_bytes_required:
self._logger.insane(
"Read %d bytes for header", self.MESSAGE_MIN_SIZE
)
(self.length, self.cmd, self.flags) = self.HEAD_PACK.unpack(
self.header_bytes[0:6]
)
self.conversation_id = uuid.UUID(
bytes_le=(self.header_bytes[6:22].tobytes())
)
self._logger.insane(
"length=%d, command=%d flags=%d conversation_id=%s from header bytes=%a",
self.length,
self.cmd,
self.flags,
self.conversation_id,
self.header_bytes,
)
self.message_offset = HEADER_LENGTH
message_bytes_required = self.length - self.message_offset
self._logger.insane(
"%d of message remaining before copy", message_bytes_required
)
if message_bytes_required > 0:
if not self.message_buffer:
self.message_buffer = bytearray()
end_span = min(chunk_len, message_bytes_required + chunk_offset)
bytes_read = end_span - chunk_offset
self.message_buffer.extend(chunk[chunk_offset:end_span])
self._logger.insane("Message buffer is %s", self.message_buffer)
message_bytes_required -= bytes_read
self.message_offset += bytes_read
chunk_offset = end_span
self._logger.insane(
"%d bytes of message remaining after copy", message_bytes_required
)
if not message_bytes_required:
message = msg.InboundMessage(
self.conversation_id, self.cmd, self.message_buffer or b""
)
self._logger.trace("Received message %r", message)
if message.command == msg.TcpCommand.HeartbeatRequest:
await self.pacemaker.handle_request(message)
elif message.command == msg.TcpCommand.HeartbeatResponse:
await self.pacemaker.handle_response(message)
else:
await self.queue.put(message)
self.length = -1
self.message_offset = 0
self.conversation_id = None
self.cmd = -1
self.header_bytes_required = self.MESSAGE_MIN_SIZE
self.message_buffer = None
class MessageDispatcher:
def __init__(self, name=None, loop=None):
self.active_conversations = {}
self._logger = logging.get_named_logger(MessageDispatcher, name)
self.output = None
self._loop = loop or asyncio.get_event_loop()
async def start_conversation(
self, conversation: convo.Conversation
) -> asyncio.futures.Future:
if not conversation.one_way:
self.active_conversations[conversation.conversation_id] = (
conversation,
None,
)
if self.output:
await conversation.start(self.output)
return conversation.result
async def write_to(self, output: asyncio.Queue):
self._logger.info(
"Dispatcher has new message writer. Re-sending %s conversations",
len(self.active_conversations),
)
self.output = output
for (conversation, _) in self.active_conversations.values():
await conversation.start(self.output)
async def dispatch(self, message: msg.InboundMessage, output: asyncio.Queue):
self._logger.debug("Received message %s", message)
conversation, result = self.active_conversations.get(
message.conversation_id, (None, None)
)
if not conversation:
self._logger.error("No conversation found for message %s", message)
return
await conversation.respond_to(message, output)
if conversation.is_complete:
del self.active_conversations[conversation.conversation_id]
def has_conversation(self, id):
return id in self.active_conversations
def remove(self, id):
if id in self.active_conversations:
del self.active_conversations[id]
[docs]class Client:
"""Top level object for interacting with Eventstore.
The client is the entry point to working with Photon Pump.
It exposes high level methods that wrap the
:class:`~photonpump.conversations.Conversation` types from
photonpump.conversations.
"""
def __init__(self, connector, dispatcher, credential=None):
self.connector = connector
self.dispatcher = dispatcher
self.credential = credential
self.outstanding_heartbeat = None
[docs] async def connect(self):
"""
Sends a start message to the Connector.
"""
await self.connector.start()
[docs] async def close(self):
"""
Stop the client from sending and receiving messages.
"""
await self.connector.stop()
[docs] async def ping(self, conversation_id: uuid.UUID = None) -> float:
"""
Send a message to the remote server to check liveness.
Returns:
The round-trip time to receive a Pong message in fractional seconds
Examples:
>>> async with connect() as conn:
>>> print("Sending a PING to the server")
>>> time_secs = await conn.ping()
>>> print("Received a PONG after {} secs".format(time_secs))
"""
cmd = convo.Ping(conversation_id=conversation_id or uuid.uuid4())
result = await self.dispatcher.start_conversation(cmd)
return await result
[docs] async def publish_event(
self,
stream: str,
type: str,
body: Optional[Any] = None,
id: Optional[uuid.UUID] = None,
metadata: Optional[Any] = None,
expected_version: int = -2,
require_master: bool = False,
) -> None:
"""
Publish a single event to the EventStore.
This method publishes a single event to the remote server and waits
for acknowledgement.
Args:
stream: The stream to publish the event to.
type: the event's type.
body: a serializable body for the event.
id: a unique id for the event. PhotonPump will automatically generate an
id if none is provided.
metadata: Optional serializable metadata block for the event.
expected_version: Used for concurrency control.
If a positive integer is provided, EventStore will check that the stream
is at that version before accepting a write.
There are three magic values:
-4: StreamMustExist. Checks that the stream already exists.
-2: Any. Disables concurrency checks
-1: NoStream. Checks that the stream does not yet exist.
0: EmptyStream. Checks that the stream has been explicitly created but
does not yet contain any events.
require_master: If true, slave nodes will reject this message.
Examples:
>>> async with connect() as conn:
>>> await conn.publish_event(
>>> "inventory_item-1",
>>> "item_created",
>>> body={ "item-id": 1, "created-date": "2018-08-19" },
>>> expected_version=ExpectedVersion.StreamMustNotExist
>>> )
>>>
>>> await conn.publish_event(
>>> "inventory_item-1",
>>> "item_deleted",
>>> expected_version=1,
>>> metadata={'deleted-by': 'bob' }
>>> )
"""
event = msg.NewEvent(type, id or uuid.uuid4(), body, metadata)
conversation = convo.WriteEvents(
stream,
[event],
expected_version=expected_version,
require_master=require_master,
)
result = await self.dispatcher.start_conversation(conversation)
return await result
async def publish(
self,
stream: str,
events: Sequence[msg.NewEventData],
expected_version=msg.ExpectedVersion.Any,
require_master=False,
):
cmd = convo.WriteEvents(
stream,
events,
expected_version=expected_version,
require_master=require_master,
)
result = await self.dispatcher.start_conversation(cmd)
return await result
[docs] async def get_event(
self,
stream: str,
event_number: int,
resolve_links=True,
require_master=False,
correlation_id: uuid.UUID = None,
) -> msg.Event:
"""
Get a single event by stream and event number.
Args:
stream: The name of the stream containing the event.
event_number: The sequence number of the event to read.
resolve_links (optional): True if eventstore should
automatically resolve Link Events, otherwise False.
required_master (optional): True if this command must be
sent direct to the master node, otherwise False.
correlation_id (optional): A unique identifer for this
command.
Returns:
The resolved event if found, else None.
Examples:
>>> async with connection() as conn:
>>> await conn.publish("inventory_item-1", "item_created")
>>> event = await conn.get_event("inventory_item-1", 1)
>>> print(event)
"""
correlation_id = correlation_id or uuid.uuid4()
cmd = convo.ReadEvent(
stream,
event_number,
resolve_links,
require_master,
conversation_id=correlation_id,
)
result = await self.dispatcher.start_conversation(cmd)
return await result
async def get(
self,
stream: str,
direction: msg.StreamDirection = msg.StreamDirection.Forward,
from_event: int = 0,
max_count: int = 100,
resolve_links: bool = True,
require_master: bool = False,
correlation_id: uuid.UUID = None,
):
correlation_id = correlation_id
cmd = convo.ReadStreamEvents(
stream,
from_event,
max_count,
resolve_links,
require_master,
direction=direction,
)
result = await self.dispatcher.start_conversation(cmd)
return await result
async def iter(
self,
stream: str,
direction: msg.StreamDirection = msg.StreamDirection.Forward,
from_event: int = None,
batch_size: int = 100,
resolve_links: bool = True,
require_master: bool = False,
correlation_id: uuid.UUID = None,
):
correlation_id = correlation_id
cmd = convo.IterStreamEvents(
stream, from_event, batch_size, resolve_links, direction=direction
)
result = await self.dispatcher.start_conversation(cmd)
iterator = await result
async for event in iterator:
yield event
async def create_subscription(
self,
name: str,
stream: str,
resolve_links: bool = True,
start_from: int = -1,
timeout_ms: int = 30000,
record_statistics: bool = False,
live_buffer_size: int = 500,
read_batch_size: int = 500,
buffer_size: int = 1000,
max_retry_count: int = 10,
prefer_round_robin: bool = False,
checkpoint_after_ms: int = 2000,
checkpoint_max_count: int = 1000,
checkpoint_min_count: int = 10,
subscriber_max_count: int = 10,
credentials: msg.Credential = None,
conversation_id: uuid.UUID = None,
consumer_strategy: str = msg.ROUND_ROBIN,
):
cmd = convo.CreatePersistentSubscription(
name,
stream,
resolve_links=resolve_links,
start_from=start_from,
timeout_ms=timeout_ms,
record_statistics=record_statistics,
live_buffer_size=live_buffer_size,
read_batch_size=read_batch_size,
buffer_size=buffer_size,
max_retry_count=max_retry_count,
prefer_round_robin=prefer_round_robin,
checkpoint_after_ms=checkpoint_after_ms,
checkpoint_max_count=checkpoint_max_count,
checkpoint_min_count=checkpoint_min_count,
subscriber_max_count=subscriber_max_count,
credentials=credentials or self.credential,
conversation_id=conversation_id,
consumer_strategy=consumer_strategy,
)
future = await self.dispatcher.start_conversation(cmd)
return await future
async def connect_subscription(
self,
subscription: str,
stream: str,
conversation_id: Optional[uuid.UUID] = None,
):
cmd = convo.ConnectPersistentSubscription(
subscription,
stream,
credentials=self.credential,
conversation_id=conversation_id,
)
future = await self.dispatcher.start_conversation(cmd)
return await future
[docs] async def subscribe_to(
self, stream, start_from=-1, resolve_link_tos=True, batch_size: int = 100
):
"""
Subscribe to receive notifications when a new event is published
to a stream.
Args:
stream: The name of the stream.
start_from (optional): The first event to read.
This parameter defaults to the magic value -1 which is treated
as meaning "from the end of the stream". IF this value is used,
no historical events will be returned.
For any other value, photonpump will read all events from
start_from until the end of the stream in pages of max_size
before subscribing to receive new events as they arrive.
resolve_links (optional): True if eventstore should
automatically resolve Link Events, otherwise False.
required_master (optional): True if this command must be
sent direct to the master node, otherwise False.
correlation_id (optional): A unique identifer for this
command.
batch_size (optioal): The number of events to pull down from
eventstore in one go.
Returns:
A VolatileSubscription.
Examples:
>>> async with connection() as conn:
>>> # Subscribe only to NEW events on the cpu-metrics stream
>>> subs = await conn.subscribe_to("price-changes")
>>> async for event in subs.events:
>>> print(event)
>>> async with connection() as conn:
>>> # Read all historical events and then receive updates as they
>>> # arrive.
>>> subs = await conn.subscribe_to("price-changes", start_from=0)
>>> async for event in subs.events:
>>> print(event)
"""
if start_from == -1:
cmd = convo.SubscribeToStream(stream, resolve_link_tos)
else:
cmd = convo.CatchupSubscription(stream, start_from, batch_size)
future = await self.dispatcher.start_conversation(cmd)
return await future
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.close()
class PhotonPumpProtocol(asyncio.streams.FlowControlMixin):
def __init__(
self,
addr: NodeService,
connection_number: int,
dispatcher: MessageDispatcher,
connector,
loop,
name,
):
self.name = name
self._log = logging.get_named_logger(
PhotonPumpProtocol, self.name, connection_number
)
self.transport = None
self.loop = loop or asyncio.get_event_loop()
super().__init__(self.loop)
self.connection_number = connection_number
self.node = addr
self.dispatcher = dispatcher
self.connector = connector
def connection_made(self, transport):
self._log.debug("Connection made.")
self.input_queue = asyncio.Queue(loop=self.loop)
self.output_queue = asyncio.Queue(loop=self.loop)
self.transport = transport
stream_reader = asyncio.StreamReader(loop=self.loop)
stream_reader.set_transport(transport)
stream_writer = asyncio.StreamWriter(transport, self, stream_reader, self.loop)
self.pacemaker = PaceMaker(self.output_queue, self.connector)
self.reader = MessageReader(
stream_reader,
self.connection_number,
self.input_queue,
self.pacemaker,
name=self.name,
)
self.writer = MessageWriter(
stream_writer, self.connection_number, self.output_queue, name=self.name
)
self.write_loop = asyncio.ensure_future(self.writer.start())
self.read_loop = asyncio.ensure_future(self.reader.start())
self.dispatch_loop = asyncio.ensure_future(self.dispatch())
self.heartbeat_loop = asyncio.ensure_future(self.pacemaker.send_heartbeats())
self.connector.connection_made(self.node, self)
def data_received(self, data):
self.reader.feed_data(data)
async def dispatch(self):
while True:
try:
next_msg = await self.input_queue.get()
await self.dispatcher.dispatch(next_msg, self.output_queue)
except asyncio.CancelledError:
break
except:
logging.exception("Dispatch loop failed")
break
def connection_lost(self, exn):
self._log.debug("Connection lost")
super().connection_lost(exn)
self._connection_lost = True
self.connector.connection_lost(exn)
async def stop(self):
self._log.debug("Stopping")
try:
self.read_loop.cancel()
self.write_loop.cancel()
self.dispatch_loop.cancel()
self.heartbeat_loop.cancel()
self._log.debug("Waiting for coroutines to end")
await asyncio.gather(
self.read_loop,
self.write_loop,
self.dispatch_loop,
self.heartbeat_loop,
loop=self.loop,
return_exceptions=True,
)
self.transport.close()
self._log.debug("Closed the transport")
except asyncio.CancelledError:
pass
[docs]def connect(
host="localhost",
port=1113,
discovery_host=None,
discovery_port=2113,
username=None,
password=None,
loop=None,
name=None,
selector=select_random,
) -> Client:
""" Create a new client.
Examples:
Since the Client is an async context manager, we can use it in a
with block for automatic connect/disconnect semantics.
>>> async with connect(host='127.0.0.1', port=1113) as c:
>>> await c.ping()
Or we can call connect at a more convenient moment
>>> c = connect()
>>> await c.connect()
>>> await c.ping()
>>> await c.close()
For cluster discovery cases, we can provide a discovery host and
port. The host may be an IP or DNS entry. If you provide a DNS
entry, discovery will choose randomly from the registered IP
addresses for the hostname.
>>> async with connect(discovery_host="eventstore.test") as c:
>>> await c.ping()
The discovery host returns gossip data about the cluster. We use the
gossip to select a node at random from the avaialble cluster members.
If you're using
:meth:`persistent subscriptions <photonpump.connection.Client.create_subscription>`
you will always want to connect to the master node of the cluster.
The selector parameter is a function that chooses an available node from
the gossip result. To select the master node, use the
:func:`photonpump.discovery.prefer_master` function. This function will return
the master node if there is a live master, and a random replica otherwise.
All requests to the server can be made with the require_master flag which
will raise an error if the current node is not a master.
>>> async with connect(
>>> discovery_host="eventstore.test",
>>> selector=discovery.prefer_master,
>>> ) as c:
>>> await c.ping(require_master=True)
Conversely, you might want to avoid connecting to the master node for reasons
of scalability. For this you can use the
:func:`photonpump.discovery.prefer_replica` function.
>>> async with connect(
>>> discovery_host="eventstore.test",
>>> selector=discovery.prefer_replica,
>>> ) as c:
>>> await c.ping()
For some operations, you may need to authenticate your requests by
providing a username and password to the client.
>>> async with connect(username='admin', password='changeit') as c:
>>> await c.ping()
Ordinarily you will create a single Client per application, but for
advanced scenarios you might want multiple connections. In this
situation, you can name each connection in order to get better logging.
>>> async with connect(name="event-reader"):
>>> await c.ping()
>>> async with connect(name="event-writer"):
>>> await c.ping()
Args:
host: The IP or DNS entry to connect with, defaults to 'localhost'.
port: The port to connect with, defaults to 1113.
discovery_host: The IP or DNS entry to use for cluster discovery.
discovery_port: The port to use for cluster discovery, defaults to 2113.
username: The username to use when communicating with eventstore.
password: The password to use when communicating with eventstore.
loop:An Asyncio event loop.
selector: An optional function that selects one element from a list of
:class:`photonpump.disovery.DiscoveredNode` elements.
"""
discovery = get_discoverer(host, port, discovery_host, discovery_port, selector)
dispatcher = MessageDispatcher(name=name, loop=loop)
connector = Connector(discovery, dispatcher, name=name)
credential = msg.Credential(username, password) if username and password else None
return Client(connector, dispatcher, credential=credential)