Photonpump API Reference

photonpump.connection.connect(host='localhost', port=1113, discovery_host=None, discovery_port=2113, username=None, password=None, loop=None, name=None, selector=<function select_random>) → photonpump.connection.Client[source]

Create a new client.

Examples

Since the Client is an async context manager, we can use it in a with block for automatic connect/disconnect semantics.

>>> async with connect(host='127.0.0.1', port=1113) as c:
>>>     await c.ping()

Or we can call connect at a more convenient moment

>>> c = connect()
>>> await c.connect()
>>> await c.ping()
>>> await c.close()

For cluster discovery cases, we can provide a discovery host and port. The host may be an IP or DNS entry. If you provide a DNS entry, discovery will choose randomly from the registered IP addresses for the hostname.

>>> async with connect(discovery_host="eventstore.test") as c:
>>>     await c.ping()

The discovery host returns gossip data about the cluster. We use the gossip to select a node at random from the avaialble cluster members.

If you’re using persistent subscriptions you will always want to connect to the master node of the cluster. The selector parameter is a function that chooses an available node from the gossip result. To select the master node, use the photonpump.discovery.prefer_master() function. This function will return the master node if there is a live master, and a random replica otherwise. All requests to the server can be made with the require_master flag which will raise an error if the current node is not a master.

>>> async with connect(
>>>     discovery_host="eventstore.test",
>>>     selector=discovery.prefer_master,
>>> ) as c:
>>>     await c.ping(require_master=True)

Conversely, you might want to avoid connecting to the master node for reasons of scalability. For this you can use the photonpump.discovery.prefer_replica() function.

>>> async with connect(
>>>     discovery_host="eventstore.test",
>>>     selector=discovery.prefer_replica,
>>> ) as c:
>>>     await c.ping()

For some operations, you may need to authenticate your requests by providing a username and password to the client.

>>> async with connect(username='admin', password='changeit') as c:
>>>     await c.ping()

Ordinarily you will create a single Client per application, but for advanced scenarios you might want multiple connections. In this situation, you can name each connection in order to get better logging.

>>> async with connect(name="event-reader"):
>>>     await c.ping()
>>> async with connect(name="event-writer"):
>>>     await c.ping()
Parameters:
  • host – The IP or DNS entry to connect with, defaults to ‘localhost’.
  • port – The port to connect with, defaults to 1113.
  • discovery_host – The IP or DNS entry to use for cluster discovery.
  • discovery_port – The port to use for cluster discovery, defaults to 2113.
  • username – The username to use when communicating with eventstore.
  • password – The password to use when communicating with eventstore.
  • loop – An Asyncio event loop.
  • selector – An optional function that selects one element from a list of photonpump.disovery.DiscoveredNode elements.
class photonpump.connection.Client(connector, dispatcher, credential=None)[source]

Top level object for interacting with Eventstore.

The client is the entry point to working with Photon Pump. It exposes high level methods that wrap the Conversation types from photonpump.conversations.

close()[source]

Stop the client from sending and receiving messages.

connect()[source]

Sends a start message to the Connector.

get_event(stream: str, event_number: int, resolve_links=True, require_master=False, correlation_id: uuid.UUID = None) → photonpump.messages.Event[source]

Get a single event by stream and event number.

Parameters:
  • stream – The name of the stream containing the event.
  • event_number – The sequence number of the event to read.
  • resolve_links (optional) – True if eventstore should automatically resolve Link Events, otherwise False.
  • required_master (optional) – True if this command must be sent direct to the master node, otherwise False.
  • correlation_id (optional) – A unique identifer for this command.
Returns:

The resolved event if found, else None.

Examples

>>> async with connection() as conn:
>>>     await conn.publish("inventory_item-1", "item_created")
>>>     event = await conn.get_event("inventory_item-1", 1)
>>>     print(event)
ping(conversation_id: uuid.UUID = None) → float[source]

Send a message to the remote server to check liveness.

Returns:The round-trip time to receive a Pong message in fractional seconds

Examples

>>> async with connect() as conn:
>>>     print("Sending a PING to the server")
>>>     time_secs = await conn.ping()
>>>     print("Received a PONG after {} secs".format(time_secs))
publish_event(stream: str, type: str, body: Optional[Any] = None, id: Optional[uuid.UUID] = None, metadata: Optional[Any] = None, expected_version: int = -2, require_master: bool = False) → None[source]

Publish a single event to the EventStore.

This method publishes a single event to the remote server and waits

for acknowledgement.

Parameters:
  • stream – The stream to publish the event to.
  • type – the event’s type.
  • body – a serializable body for the event.
  • id – a unique id for the event. PhotonPump will automatically generate an id if none is provided.
  • metadata – Optional serializable metadata block for the event.
  • expected_version

    Used for concurrency control. If a positive integer is provided, EventStore will check that the stream is at that version before accepting a write.

    There are three magic values:
    -4: StreamMustExist. Checks that the stream already exists. -2: Any. Disables concurrency checks -1: NoStream. Checks that the stream does not yet exist. 0: EmptyStream. Checks that the stream has been explicitly created but does not yet contain any events.
  • require_master – If true, slave nodes will reject this message.

Examples

>>> async with connect() as conn:
>>>     await conn.publish_event(
>>>         "inventory_item-1",
>>>         "item_created",
>>>         body={ "item-id": 1, "created-date": "2018-08-19" },
>>>         expected_version=ExpectedVersion.StreamMustNotExist
>>>     )
>>>
>>>     await conn.publish_event(
>>>         "inventory_item-1",
>>>         "item_deleted",
>>>         expected_version=1,
>>>         metadata={'deleted-by': 'bob' }
>>>     )
subscribe_to(stream, start_from=-1, resolve_link_tos=True, batch_size: int = 100)[source]

Subscribe to receive notifications when a new event is published to a stream.

Parameters:
  • stream – The name of the stream.
  • start_from (optional) –

    The first event to read. This parameter defaults to the magic value -1 which is treated as meaning “from the end of the stream”. IF this value is used, no historical events will be returned.

    For any other value, photonpump will read all events from start_from until the end of the stream in pages of max_size before subscribing to receive new events as they arrive.

  • resolve_links (optional) – True if eventstore should automatically resolve Link Events, otherwise False.
  • required_master (optional) – True if this command must be sent direct to the master node, otherwise False.
  • correlation_id (optional) – A unique identifer for this command.
  • batch_size (optioal) – The number of events to pull down from eventstore in one go.
Returns:

A VolatileSubscription.

Examples

>>> async with connection() as conn:
>>>     # Subscribe only to NEW events on the cpu-metrics stream
>>>     subs = await conn.subscribe_to("price-changes")
>>>     async for event in subs.events:
>>>         print(event)
>>> async with connection() as conn:
>>>     # Read all historical events and then receive updates as they
>>>     # arrive.
>>>     subs = await conn.subscribe_to("price-changes", start_from=0)
>>>     async for event in subs.events:
>>>         print(event)
class photonpump.messages.ContentType[source]

An enumeration.

photonpump.messages.EventRecord

alias of photonpump.messages.photonpump_eventrecord

class photonpump.messages.ExpectedVersion[source]

Static values for concurrency control

Any

No concurrency control.

StreamMustNotExist

The request should fail if the stream already exists.

StreamMustBeEmpty

The request should fail if the stream does not exist, or if the stream already contains events.

StreamMustExist

The request should fail if the stream does not exist.

photonpump.messages.Header

alias of photonpump.messages.photonpump_result_header

photonpump.messages.NewEvent(type: str, id: uuid.UUID = None, data: Dict[str, Any] = None, metadata: Dict[str, Any] = None) → photonpump.messages.photonpump_event[source]

Build the data structure for a new event.

Parameters:
  • type – An event type.
  • id – The uuid identifier for the event.
  • data – A dict containing data for the event. These data must be json serializable.
  • metadata – A dict containing metadata about the event. These must be json serializable.
photonpump.messages.NewEventData

alias of photonpump.messages.photonpump_event

class photonpump.messages.NotHandledReason

An enumeration.

class photonpump.messages.OperationFlags[source]

An enumeration.

class photonpump.messages.OperationResult

An enumeration.

class photonpump.messages.ReadEventResult

An enumeration.

class photonpump.messages.ReadStreamResult

An enumeration.

class photonpump.messages.StreamDirection[source]

An enumeration.

class photonpump.messages.StreamSlice(events: Sequence[photonpump.messages.Event], next_event_number: int, last_event_number: int, prepare_position: int = None, commit_position: int = None, is_end_of_stream: bool = False)[source]
class photonpump.messages.SubscriptionDropReason

An enumeration.

photonpump.messages.SubscriptionResult

alias of photonpump.messages.CreatePersistentSubscriptionResult

class photonpump.messages.TcpCommand[source]

An enumeration.

class photonpump.discovery.DiscoveredNode(state, is_alive, internal_tcp, external_tcp, internal_http, external_http)[source]
external_http

Alias for field number 5

external_tcp

Alias for field number 3

internal_http

Alias for field number 4

internal_tcp

Alias for field number 2

is_alive

Alias for field number 1

state

Alias for field number 0

exception photonpump.discovery.DiscoveryFailed[source]
class photonpump.discovery.DiscoveryStats(node, attempts, successes, failures, consecutive_failures)[source]
attempts

Alias for field number 1

consecutive_failures

Alias for field number 4

failures

Alias for field number 3

node

Alias for field number 0

successes

Alias for field number 2

class photonpump.discovery.NodeService(address, port, secure_port)[source]
address

Alias for field number 0

port

Alias for field number 1

secure_port

Alias for field number 2

class photonpump.discovery.NodeState[source]

An enumeration.

class photonpump.discovery.Stats[source]
photonpump.discovery.prefer_master(nodes: List[photonpump.discovery.DiscoveredNode]) → Optional[photonpump.discovery.DiscoveredNode][source]

Select the master if available, otherwise fall back to a replica.

photonpump.discovery.prefer_replica(nodes: List[photonpump.discovery.DiscoveredNode]) → Optional[photonpump.discovery.DiscoveredNode][source]

Select a random replica if any are available or fall back to the master.

photonpump.discovery.select_random(nodes: List[photonpump.discovery.DiscoveredNode]) → Optional[photonpump.discovery.DiscoveredNode][source]

Return a random node.