/opt/imunify360/venv/lib/python3.11/site-packages/nats/js
# Copyright 2021-2022 The NATS Authors # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from __future__ import annotations import asyncio import json import time from email.parser import BytesParser from secrets import token_hex from typing import ( TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, ) import nats.errors import nats.js.errors from nats.aio.msg import Msg from nats.aio.subscription import Subscription from nats.js import api from nats.js.errors import ( BadBucketError, BucketNotFoundError, FetchTimeoutError, InvalidBucketNameError, NotFoundError, ) from nats.js.kv import KeyValue from nats.js.manager import JetStreamManager from nats.js.object_store import ( OBJ_ALL_CHUNKS_PRE_TEMPLATE, OBJ_ALL_META_PRE_TEMPLATE, OBJ_STREAM_TEMPLATE, VALID_BUCKET_RE, ObjectStore, ) if TYPE_CHECKING: from nats import NATS NO_RESPONDERS_STATUS = "503" NATS_HDR_LINE = bytearray(b"NATS/1.0") NATS_HDR_LINE_SIZE = len(NATS_HDR_LINE) _CRLF_ = b"\r\n" _CRLF_LEN_ = len(_CRLF_) KV_STREAM_TEMPLATE = "KV_{bucket}" KV_PRE_TEMPLATE = "$KV.{bucket}." Callback = Callable[["Msg"], Awaitable[None]] # For JetStream the default pending limits are larger. DEFAULT_JS_SUB_PENDING_MSGS_LIMIT = 512 * 1024 DEFAULT_JS_SUB_PENDING_BYTES_LIMIT = 256 * 1024 * 1024 # Max history limit for key value. KV_MAX_HISTORY = 64 class JetStreamContext(JetStreamManager): """ Fully featured context for interacting with JetStream. :param conn: NATS Connection :param prefix: Default JetStream API Prefix. :param domain: Optional domain used by the JetStream API. :param timeout: Timeout for all JS API actions. :param publish_async_max_pending: Maximum outstanding async publishes that can be inflight at one time. :: import asyncio import nats async def main(): nc = await nats.connect() js = nc.jetstream() await js.add_stream(name='hello', subjects=['hello']) ack = await js.publish('hello', b'Hello JS!') print(f'Ack: stream={ack.stream}, sequence={ack.seq}') # Ack: stream=hello, sequence=1 await nc.close() if __name__ == '__main__': asyncio.run(main()) """ def __init__( self, conn: NATS, prefix: str = api.DEFAULT_PREFIX, domain: Optional[str] = None, timeout: float = 5, publish_async_max_pending: int = 4000, ) -> None: self._prefix = prefix if domain is not None: self._prefix = f"$JS.{domain}.API" self._nc = conn self._timeout = timeout self._hdr_parser = BytesParser() self._async_reply_prefix: Optional[bytearray] = None self._publish_async_futures: Dict[str, asyncio.Future] = {} self._publish_async_completed_event = asyncio.Event() self._publish_async_completed_event.set() self._publish_async_pending_semaphore = asyncio.Semaphore(publish_async_max_pending) @property def _jsm(self) -> JetStreamManager: return JetStreamManager( conn=self._nc, prefix=self._prefix, timeout=self._timeout, ) async def _init_async_reply(self) -> None: self._publish_async_futures = {} self._async_reply_prefix = self._nc._inbox_prefix[:] self._async_reply_prefix.extend(b".") self._async_reply_prefix.extend(self._nc._nuid.next()) self._async_reply_prefix.extend(b".") async_reply_subject = self._async_reply_prefix[:] async_reply_subject.extend(b"*") await self._nc.subscribe(async_reply_subject.decode(), cb=self._handle_async_reply) async def _handle_async_reply(self, msg: Msg) -> None: token = msg.subject[len(self._nc._inbox_prefix) + 22 + 2 :] future = self._publish_async_futures.get(token) if not future: return if future.done(): return # Handle no responders if msg.headers and msg.headers.get(api.Header.STATUS) == NO_RESPONDERS_STATUS: future.set_exception(nats.js.errors.NoStreamResponseError) return # Handle response errors try: resp = json.loads(msg.data) if "error" in resp: err = nats.js.errors.APIError.from_error(resp["error"]) future.set_exception(err) return ack = api.PubAck.from_response(resp) future.set_result(ack) except (asyncio.CancelledError, asyncio.InvalidStateError): pass async def publish( self, subject: str, payload: bytes = b"", timeout: Optional[float] = None, stream: Optional[str] = None, headers: Optional[Dict[str, Any]] = None, msg_ttl: Optional[float] = None, ) -> api.PubAck: """ publish emits a new message to JetStream and waits for acknowledgement. :param subject: Subject to publish to. :param payload: Message payload. :param timeout: Request timeout in seconds. :param stream: Expected stream name. :param headers: Message headers. :param msg_ttl: Per-message TTL in seconds (requires NATS Server 2.11+). """ hdr = headers if timeout is None: timeout = self._timeout if stream is not None: hdr = hdr or {} hdr[api.Header.EXPECTED_STREAM] = stream if msg_ttl is not None: hdr = hdr or {} # TTL header accepts seconds as integer or duration string hdr[api.Header.MSG_TTL] = str(int(msg_ttl)) try: msg = await self._nc.request( subject, payload, timeout=timeout, headers=hdr, ) except nats.errors.NoRespondersError: raise nats.js.errors.NoStreamResponseError resp = json.loads(msg.data) if "error" in resp: raise nats.js.errors.APIError.from_error(resp["error"]) return api.PubAck.from_response(resp) async def publish_async( self, subject: str, payload: bytes = b"", wait_stall: Optional[float] = None, stream: Optional[str] = None, headers: Optional[Dict] = None, msg_ttl: Optional[float] = None, ) -> asyncio.Future[api.PubAck]: """ emits a new message to JetStream and returns a future that can be awaited for acknowledgement. :param subject: Subject to publish to. :param payload: Message payload. :param wait_stall: Maximum time to wait for semaphore in seconds. :param stream: Expected stream name. :param headers: Message headers. :param msg_ttl: Per-message TTL in seconds (requires NATS Server 2.11+). """ if not self._async_reply_prefix: await self._init_async_reply() assert self._async_reply_prefix hdr = headers if stream is not None: hdr = hdr or {} hdr[api.Header.EXPECTED_STREAM] = stream if msg_ttl is not None: hdr = hdr or {} # TTL header accepts seconds as integer or duration string hdr[api.Header.MSG_TTL] = str(int(msg_ttl)) try: await asyncio.wait_for(self._publish_async_pending_semaphore.acquire(), timeout=wait_stall) except (asyncio.TimeoutError, asyncio.CancelledError): raise nats.js.errors.TooManyStalledMsgsError # Use a new NUID + couple of unique token bytes to identify the request, # then use the future to get the response. token = self._nc._nuid.next() token.extend(token_hex(2).encode()) inbox = self._async_reply_prefix[:] inbox.extend(token) future: asyncio.Future = asyncio.Future() def handle_done(future): self._publish_async_futures.pop(token.decode(), None) if len(self._publish_async_futures) == 0: self._publish_async_completed_event.set() self._publish_async_pending_semaphore.release() future.add_done_callback(handle_done) self._publish_async_futures[token.decode()] = future if self._publish_async_completed_event.is_set(): self._publish_async_completed_event.clear() await self._nc.publish(subject, payload, reply=inbox.decode(), headers=hdr) return future def publish_async_pending(self) -> int: """ returns the number of pending async publishes. """ return len(self._publish_async_futures) async def publish_async_completed(self) -> None: """ waits for all pending async publishes to be completed. """ await self._publish_async_completed_event.wait() async def subscribe( self, subject: str, queue: Optional[str] = None, cb: Optional[Callback] = None, durable: Optional[str] = None, stream: Optional[str] = None, config: Optional[api.ConsumerConfig] = None, manual_ack: bool = False, ordered_consumer: bool = False, idle_heartbeat: Optional[float] = None, flow_control: bool = False, pending_msgs_limit: int = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, pending_bytes_limit: int = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, deliver_policy: Optional[api.DeliverPolicy] = None, headers_only: Optional[bool] = None, inactive_threshold: Optional[float] = None, ) -> PushSubscription: """Create consumer if needed and push-subscribe to it. 1. Check if consumer exists. 2. Creates consumer if needed. 3. Calls `subscribe_bind`. :param subject: Subject from a stream from JetStream. :param queue: Deliver group name from a set a of queue subscribers. :param durable: Name of the durable consumer to which the the subscription should be bound. :param stream: Name of the stream to which the subscription should be bound. If not set, then the client will automatically look it up based on the subject. :param manual_ack: Disables auto acking for async subscriptions. :param ordered_consumer: Enable ordered consumer mode. :param idle_heartbeat: Enable Heartbeats for a consumer to detect failures. :param flow_control: Enable Flow Control for a consumer. :: import asyncio import nats async def main(): nc = await nats.connect() js = nc.jetstream() await js.add_stream(name='hello', subjects=['hello']) await js.publish('hello', b'Hello JS!') async def cb(msg): print('Received:', msg) # Ephemeral Async Subscribe await js.subscribe('hello', cb=cb) # Durable Async Subscribe # NOTE: Only one subscription can be bound to a durable name. It also auto acks by default. await js.subscribe('hello', cb=cb, durable='foo') # Durable Sync Subscribe # NOTE: Sync subscribers do not auto ack. await js.subscribe('hello', durable='bar') # Queue Async Subscribe # NOTE: Here 'workers' becomes deliver_group, durable name and queue name. await js.subscribe('hello', 'workers', cb=cb) if __name__ == '__main__': asyncio.run(main()) """ if stream is None: stream = await self._jsm.find_stream_name_by_subject(subject) deliver = None consumer = None # If using a queue, that will be the consumer/durable name. if queue: if durable and durable != queue: raise nats.js.errors.Error(f"cannot create queue subscription '{queue}' to consumer '{durable}'") else: durable = queue consumer_info = None # Ephemeral subscribe always has to be auto created. should_create = not durable if durable: try: # TODO: Detect configuration drift with any present durable consumer. consumer_info = await self._jsm.consumer_info(stream, durable) consumer = durable except nats.js.errors.NotFoundError: should_create = True if consumer_info is not None: config = consumer_info.config # At this point, we know the user wants push mode, and the JS consumer is # really push mode. deliver_group = consumer_info.config.deliver_group if not deliver_group: # Prevent an user from attempting to create a queue subscription on # a JS consumer that was not created with a deliver group. if queue: # TODO: Currently, this would not happen in client # since the queue name is used as durable name. raise nats.js.errors.Error( "cannot create a queue subscription for a consumer without a deliver group" ) elif consumer_info.push_bound: # Need to reject a non queue subscription to a non queue consumer # if the consumer is already bound. raise nats.js.errors.Error("consumer is already bound to a subscription") else: if not queue: raise nats.js.errors.Error( f"cannot create a subscription for a consumer with a deliver group {deliver_group}" ) elif queue != deliver_group: raise nats.js.errors.Error( f"cannot create a queue subscription {queue} for a consumer " f"with a deliver group {deliver_group}" ) elif should_create: # Auto-create consumer if none found. if config is None: config = api.ConsumerConfig() if not config.durable_name: config.durable_name = durable if not config.deliver_group: config.deliver_group = queue if not config.headers_only: config.headers_only = headers_only if deliver_policy: # NOTE: deliver_policy is defaulting to ALL so check is different for this one. config.deliver_policy = deliver_policy if inactive_threshold: config.inactive_threshold = inactive_threshold # Create inbox for push consumer, if deliver_subject is not assigned already. if config.deliver_subject is None: deliver = self._nc.new_inbox() config.deliver_subject = deliver # Auto created consumers use the filter subject, unless filter_subjects is set. if not config.filter_subjects: config.filter_subject = subject # Heartbeats / FlowControl config.flow_control = flow_control if idle_heartbeat: config.idle_heartbeat = idle_heartbeat else: idle_heartbeat = config.idle_heartbeat or 5 # Enable ordered consumer mode where at most there is # one message being delivered at a time. if ordered_consumer: config.flow_control = True config.ack_policy = api.AckPolicy.NONE config.max_deliver = 1 config.ack_wait = 22 * 3600 # 22 hours config.idle_heartbeat = idle_heartbeat config.num_replicas = 1 config.mem_storage = True consumer_info = await self._jsm.add_consumer(stream, config=config) consumer = consumer_info.name if consumer is None: raise TypeError("cannot detect consumer") if config is None: raise TypeError("config is required for existing durable consumer") return await self.subscribe_bind( cb=cb, stream=stream, config=config, manual_ack=manual_ack, ordered_consumer=ordered_consumer, consumer=consumer, pending_msgs_limit=pending_msgs_limit, pending_bytes_limit=pending_bytes_limit, ) async def subscribe_bind( self, stream: str, config: api.ConsumerConfig, consumer: str, cb: Optional[Callback] = None, manual_ack: bool = False, ordered_consumer: bool = False, pending_msgs_limit: int = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, pending_bytes_limit: int = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, ) -> PushSubscription: """Push-subscribe to an existing consumer.""" # By default, async subscribers wrap the original callback and # auto ack the messages as they are delivered. # # In case ack policy is none then we also do not require to ack. # if cb and (not manual_ack) and (config.ack_policy is not api.AckPolicy.NONE): cb = self._auto_ack_callback(cb) if config.deliver_subject is None: raise TypeError("config.deliver_subject is required") sub = await self._nc.subscribe( subject=config.deliver_subject, queue=config.deliver_group or "", cb=cb, pending_msgs_limit=pending_msgs_limit, pending_bytes_limit=pending_bytes_limit, ) psub = JetStreamContext.PushSubscription(self, sub, stream, consumer) # Keep state to support ordered consumers. sub._jsi = JetStreamContext._JSI( js=self, conn=self._nc, stream=stream, ordered=ordered_consumer, psub=psub, sub=sub, ccreq=config, ) if config.idle_heartbeat: sub._jsi._hbtask = asyncio.create_task(sub._jsi.activity_check()) if ordered_consumer: sub._jsi._fctask = asyncio.create_task(sub._jsi.check_flow_control_response()) return psub @staticmethod def _auto_ack_callback(callback: Callback) -> Callback: async def new_callback(msg: Msg) -> None: await callback(msg) try: await msg.ack() except nats.errors.MsgAlreadyAckdError: pass return new_callback async def pull_subscribe( self, subject: str, durable: Optional[str] = None, stream: Optional[str] = None, config: Optional[api.ConsumerConfig] = None, pending_msgs_limit: int = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, pending_bytes_limit: int = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, inbox_prefix: Optional[bytes] = None, ) -> JetStreamContext.PullSubscription: """Create consumer and pull subscription. 1. Find stream name by subject if `stream` is not passed. 2. Create consumer with the given `config` if not created. 3. Call `pull_subscribe_bind`. :: import asyncio import nats async def main(): nc = await nats.connect() js = nc.jetstream() await js.add_stream(name='mystream', subjects=['foo']) await js.publish('foo', b'Hello World!') sub = await js.pull_subscribe('foo', stream='mystream') msgs = await sub.fetch() msg = msgs[0] await msg.ack() await nc.close() if __name__ == '__main__': asyncio.run(main()) """ if stream is None: stream = await self._jsm.find_stream_name_by_subject(subject) should_create = True try: if durable: await self._jsm.consumer_info(stream, durable) should_create = False except nats.js.errors.NotFoundError: pass consumer_name = durable if should_create: # If not found then attempt to create with the defaults. if config is None: config = api.ConsumerConfig() # Auto created consumers use the filter subject, unless filter_subjects is set. if not config.filter_subjects: config.filter_subject = subject if durable: config.name = durable config.durable_name = durable else: consumer_name = self._nc._nuid.next().decode() config.name = consumer_name await self._jsm.add_consumer(stream, config=config) return await self.pull_subscribe_bind( durable=durable, stream=stream, inbox_prefix=inbox_prefix, pending_bytes_limit=pending_bytes_limit, pending_msgs_limit=pending_msgs_limit, name=consumer_name, ) async def pull_subscribe_bind( self, consumer: Optional[str] = None, stream: Optional[str] = None, inbox_prefix: Optional[bytes] = None, pending_msgs_limit: int = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, pending_bytes_limit: int = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, name: Optional[str] = None, durable: Optional[str] = None, ) -> JetStreamContext.PullSubscription: """ pull_subscribe returns a `PullSubscription` that can be delivered messages from a JetStream pull based consumer by calling `sub.fetch`. :: import asyncio import nats async def main(): nc = await nats.connect() js = nc.jetstream() await js.add_stream(name='mystream', subjects=['foo']) await js.publish('foo', b'Hello World!') msgs = await sub.fetch() msg = msgs[0] await msg.ack() await nc.close() if __name__ == '__main__': asyncio.run(main()) """ if not stream: raise ValueError("nats: stream name is required") if inbox_prefix is None: inbox_prefix = bytes(self._nc._inbox_prefix[:]) + b"." deliver = inbox_prefix + self._nc._nuid.next() sub = await self._nc.subscribe( deliver.decode(), pending_msgs_limit=pending_msgs_limit, pending_bytes_limit=pending_bytes_limit, ) consumer_name = None # In nats.py v2.7.0 changing the first arg to be 'consumer' instead of 'durable', # but continue to support for backwards compatibility. if durable: consumer_name = durable elif name: # This should not be common and 'consumer' arg preferred instead but support anyway. consumer_name = name else: consumer_name = consumer return JetStreamContext.PullSubscription( js=self, sub=sub, stream=stream, consumer=consumer_name, deliver=deliver, ) @classmethod def is_status_msg(cls, msg: Optional[Msg]) -> Optional[str]: if msg is None or msg.headers is None: return None return msg.headers.get(api.Header.STATUS) @classmethod def _is_processable_msg(cls, status: Optional[str], msg: Msg) -> bool: if not status: return True # Skip most 4XX errors and do not raise exception. if JetStreamContext._is_temporary_error(status): return False raise nats.js.errors.APIError.from_msg(msg) @classmethod def _is_temporary_error(cls, status: Optional[str]) -> bool: if ( status == api.StatusCode.NO_MESSAGES or status == api.StatusCode.CONFLICT or status == api.StatusCode.REQUEST_TIMEOUT ): return True else: return False @classmethod def _is_heartbeat(cls, status: Optional[str]) -> bool: if status == api.StatusCode.CONTROL_MESSAGE: return True else: return False @classmethod def _time_until(cls, timeout: Optional[float], start_time: float) -> Optional[float]: if timeout is None: return None return timeout - (time.monotonic() - start_time) class _JSI: def __init__( self, js: JetStreamContext, conn: NATS, stream: str, ordered: Optional[bool], psub: JetStreamContext.PushSubscription, sub: Subscription, ccreq: api.ConsumerConfig, ) -> None: self._conn = conn self._js = js self._stream = stream self._ordered = ordered self._psub = psub self._sub = sub self._ccreq = ccreq # Heartbeat self._hbtask = None self._hbi = None if ccreq and ccreq.idle_heartbeat: self._hbi = ccreq.idle_heartbeat # Ordered Consumer implementation. self._dseq = 1 self._sseq = 0 self._cmeta: Optional[str] = None self._fcr: Optional[str] = None self._fcd = 0 self._fciseq = 0 self._active: Optional[bool] = True self._fctask = None def track_sequences(self, reply: str) -> None: self._fciseq += 1 self._cmeta = reply def schedule_flow_control_response(self, reply: str) -> None: self._active = True self._fcr = reply self._fcd = self._fciseq def get_js_delivered(self): if self._sub._cb: return self._sub.delivered return self._fciseq - self._sub._pending_queue.qsize() async def activity_check(self): # Can at most miss two heartbeats. hbc_threshold = 2 while True: try: if self._conn.is_closed: break # Wait for all idle heartbeats to be received, # one of them would have toggled the state of the # consumer back to being active. await asyncio.sleep(self._hbi * hbc_threshold) active = self._active self._active = False if not active: if self._ordered: await self.reset_ordered_consumer(self._sseq + 1) except asyncio.CancelledError: break async def check_flow_control_response(self): while True: try: if self._conn.is_closed: break if (self._fciseq - self._psub._pending_queue.qsize()) >= self._fcd: fc_reply = self._fcr try: if fc_reply: await self._conn.publish(fc_reply) except Exception: pass self._fcr = None self._fcd = 0 await asyncio.sleep(0.25) except asyncio.CancelledError: break async def check_for_sequence_mismatch(self, msg: Msg) -> Optional[bool]: self._active = True if not self._cmeta: return None tokens = msg._get_metadata_fields(self._cmeta) dseq = int(tokens[6]) # consumer sequence ldseq = None if msg.headers: ldseq_str = msg.headers.get(api.Header.LAST_CONSUMER) if ldseq_str: ldseq = int(ldseq_str) did_reset = None if ldseq != dseq: sseq = int(tokens[5]) # stream sequence if self._ordered: did_reset = await self.reset_ordered_consumer(self._sseq + 1) else: ecs = nats.js.errors.ConsumerSequenceMismatchError( stream_resume_sequence=sseq, consumer_sequence=dseq, last_consumer_sequence=ldseq, ) await self._conn._error_cb(ecs) return did_reset async def reset_ordered_consumer(self, sseq: Optional[int]) -> bool: # FIXME: Handle AUTO_UNSUB called previously to this. # Replace current subscription. osid = self._sub._id self._conn._remove_sub(osid) new_deliver = self._conn.new_inbox() # Place new one. self._conn._sid += 1 nsid = self._conn._sid self._conn._subs[nsid] = self._sub self._sub._id = nsid self._psub._id = nsid # unsub await self._conn._send_unsubscribe(osid) # resub self._sub._subject = new_deliver await self._conn._send_subscribe(self._sub) # relinquish cpu to let proto commands make it to the server. await asyncio.sleep(0) # Reset some items in jsi. self._cmeta = None self._dseq = 1 # Reset consumer request for starting policy. config = self._ccreq config.deliver_subject = new_deliver config.deliver_policy = api.DeliverPolicy.BY_START_SEQUENCE config.opt_start_seq = sseq self._ccreq = config # Handle the creation of new consumer in a background task # to avoid blocking the process_msg coroutine further # when making the request. asyncio.create_task(self.recreate_consumer()) return True async def recreate_consumer(self) -> None: try: cinfo = await self._js._jsm.add_consumer(self._stream, config=self._ccreq, timeout=self._js._timeout) self._psub._consumer = cinfo.name except Exception as err: await self._conn._error_cb(err) class PushSubscription(Subscription): """ PushSubscription is a subscription that is delivered messages. """ def __init__( self, js: JetStreamContext, sub: Subscription, stream: str, consumer: str, ) -> None: self._js = js self._stream = stream self._consumer = consumer self._sub = sub self._conn = sub._conn self._id = sub._id self._subject = sub._subject self._queue = sub._queue self._max_msgs = sub._max_msgs self._received = sub._received self._cb = sub._cb self._future = sub._future self._closed = sub._closed # Per subscription message processor. self._pending_msgs_limit = sub._pending_msgs_limit self._pending_bytes_limit = sub._pending_bytes_limit self._pending_queue = sub._pending_queue self._pending_size = sub._pending_size self._wait_for_msgs_task = sub._wait_for_msgs_task self._message_iterator = sub._message_iterator self._pending_next_msgs_calls = sub._pending_next_msgs_calls async def consumer_info(self) -> api.ConsumerInfo: """ consumer_info gets the current info of the consumer from this subscription. """ info = await self._js._jsm.consumer_info( self._stream, self._consumer, ) return info @property def delivered(self) -> int: """ Number of delivered messages to this subscription so far. """ return self._sub._received @delivered.setter def delivered(self, value): self._sub._received = value @property def _pending_size(self): return self._sub._pending_size @_pending_size.setter def _pending_size(self, value): self._sub._pending_size = value async def next_msg(self, timeout: Optional[float] = 1.0) -> Msg: """ :params timeout: Time in seconds to wait for next message before timing out. :raises nats.errors.TimeoutError: next_msg can be used to retrieve the next message from a stream of messages using await syntax, this only works when not passing a callback on `subscribe`:: """ msg = await self._sub.next_msg(timeout) # In case there is a flow control reply present need to handle here. if self._sub and self._sub._jsi: self._sub._jsi._active = True if self._sub._jsi.get_js_delivered() >= self._sub._jsi._fciseq: fc_reply = self._sub._jsi._fcr if fc_reply: await self._conn.publish(fc_reply) self._sub._jsi._fcr = None return msg async def unsubscribe(self, limit: int = 0): """ Unsubscribes from a subscription, canceling any heartbeat and flow control tasks, and optionally limits the number of messages to process before unsubscribing. """ await super().unsubscribe(limit) if self._sub._jsi._hbtask: self._sub._jsi._hbtask.cancel() if self._sub._jsi._fctask: self._sub._jsi._fctask.cancel() class PullSubscription: """ PullSubscription is a subscription that can fetch messages. """ def __init__( self, js: JetStreamContext, sub: Subscription, stream: str, consumer: str, deliver: bytes, ) -> None: # JS/JSM context self._js = js self._nc = js._nc # NATS Subscription self._sub = sub self._stream = stream self._consumer = consumer prefix = self._js._prefix self._nms = f"{prefix}.CONSUMER.MSG.NEXT.{stream}.{consumer}" self._deliver = deliver.decode() @property def pending_msgs(self) -> int: """ Number of delivered messages by the NATS Server that are being buffered in the pending queue. """ return self._sub._pending_queue.qsize() @property def pending_bytes(self) -> int: """ Size of data sent by the NATS Server that is being buffered in the pending queue. """ return self._sub._pending_size @property def delivered(self) -> int: """ Number of delivered messages to this subscription so far. """ return self._sub._received async def unsubscribe(self) -> None: """ unsubscribe destroys the inboxes of the pull subscription making it unable to continue to receive messages. """ if self._sub is None: raise ValueError("nats: invalid subscription") await self._sub.unsubscribe() async def consumer_info(self) -> api.ConsumerInfo: """ consumer_info gets the current info of the consumer from this subscription. """ info = await self._js._jsm.consumer_info(self._stream, self._consumer) return info async def fetch( self, batch: int = 1, timeout: Optional[float] = 5, heartbeat: Optional[float] = None, ) -> List[Msg]: """ fetch makes a request to JetStream to be delivered a set of messages. :param batch: Number of messages to fetch from server. :param timeout: Max duration of the fetch request before it expires. :param heartbeat: Idle Heartbeat interval in seconds for the fetch request. :: import asyncio import nats async def main(): nc = await nats.connect() js = nc.jetstream() await js.add_stream(name='mystream', subjects=['foo']) await js.publish('foo', b'Hello World!') msgs = await sub.fetch(5) for msg in msgs: await msg.ack() await nc.close() if __name__ == '__main__': asyncio.run(main()) """ if self._sub is None: raise ValueError("nats: invalid subscription") # FIXME: Check connection is not closed, etc... if batch < 1: raise ValueError("nats: invalid batch size") if timeout is not None and timeout <= 0: raise ValueError("nats: invalid fetch timeout") expires = int(timeout * 1_000_000_000) - 100_000 if timeout else None if batch == 1: msg = await self._fetch_one(expires, timeout, heartbeat) return [msg] msgs = await self._fetch_n(batch, expires, timeout, heartbeat) return msgs async def _fetch_one( self, expires: Optional[int], timeout: Optional[float], heartbeat: Optional[float] = None, ) -> Msg: queue = self._sub._pending_queue # Check the next message in case there are any. while not queue.empty(): try: msg = queue.get_nowait() self._sub._pending_size -= len(msg.data) status = JetStreamContext.is_status_msg(msg) if status: # Discard status messages at this point since were meant # for other fetch requests. continue return msg except Exception: # Fallthrough to make request in case this failed. pass # Make lingering request with expiration and wait for response. next_req = {} next_req["batch"] = 1 if expires: next_req["expires"] = int(expires) if heartbeat: next_req["idle_heartbeat"] = int(heartbeat * 1_000_000_000) # to nanoseconds await self._nc.publish( self._nms, json.dumps(next_req).encode(), self._deliver, ) start_time = time.monotonic() got_any_response = False while True: try: deadline = JetStreamContext._time_until(timeout, start_time) # Wait for the response or raise timeout. msg = await self._sub.next_msg(timeout=deadline) # Should have received at least a processable message at this point, status = JetStreamContext.is_status_msg(msg) if status: if JetStreamContext._is_heartbeat(status): got_any_response = True continue # In case of a temporary error, treat it as a timeout to retry. if JetStreamContext._is_temporary_error(status): raise nats.errors.TimeoutError else: # Any other type of status message is an error. raise nats.js.errors.APIError.from_msg(msg) else: return msg except asyncio.TimeoutError: deadline = JetStreamContext._time_until(timeout, start_time) if deadline is not None and deadline < 0: # No response from the consumer could have been # due to a reconnect while the fetch request, # the JS API not responding on time, or maybe # there were no messages yet. if got_any_response: raise FetchTimeoutError raise async def _fetch_n( self, batch: int, expires: Optional[int], timeout: Optional[float], heartbeat: Optional[float] = None, ) -> List[Msg]: msgs = [] queue = self._sub._pending_queue start_time = time.monotonic() got_any_response = False needed = batch # Fetch as many as needed from the internal pending queue. msg: Optional[Msg] while not queue.empty(): try: msg = queue.get_nowait() self._sub._pending_size -= len(msg.data) status = JetStreamContext.is_status_msg(msg) if status: # Discard status messages at this point since were meant # for other fetch requests. continue needed -= 1 msgs.append(msg) except Exception: pass # First request: Use no_wait to synchronously get as many available # based on the batch size until server sends 'No Messages' status msg. next_req = {} next_req["batch"] = needed if expires: next_req["expires"] = expires if heartbeat: next_req["idle_heartbeat"] = int(heartbeat * 1_000_000_000) # to nanoseconds next_req["no_wait"] = True await self._nc.publish( self._nms, json.dumps(next_req).encode(), self._deliver, ) await asyncio.sleep(0) try: msg = await self._sub.next_msg(timeout) except asyncio.TimeoutError: # Return any message that was already available in the internal queue. if msgs: return msgs raise got_any_response = False status = JetStreamContext.is_status_msg(msg) if JetStreamContext._is_heartbeat(status): # Mark that we got any response from the server so this is not # a possible i/o timeout error or due to a disconnection. got_any_response = True pass elif JetStreamContext._is_processable_msg(status, msg): # First processable message received, do not raise error from now. msgs.append(msg) needed -= 1 try: for i in range(0, needed): deadline = JetStreamContext._time_until(timeout, start_time) msg = await self._sub.next_msg(timeout=deadline) status = JetStreamContext.is_status_msg(msg) if status == api.StatusCode.NO_MESSAGES or status == api.StatusCode.REQUEST_TIMEOUT: # No more messages after this so fallthrough # after receiving the rest. break elif JetStreamContext._is_heartbeat(status): # Skip heartbeats. got_any_response = True continue elif JetStreamContext._is_processable_msg(status, msg): needed -= 1 msgs.append(msg) except asyncio.TimeoutError: # Ignore any timeout errors at this point since # at least one message has already arrived. pass # Stop if have some messages. if len(msgs) > 0: return msgs # Second request: lingering request that will block until new messages # are made available and delivered to the client. next_req = {} next_req["batch"] = needed if expires: next_req["expires"] = expires if heartbeat: next_req["idle_heartbeat"] = int(heartbeat * 1_000_000_000) # to nanoseconds await self._nc.publish( self._nms, json.dumps(next_req).encode(), self._deliver, ) await asyncio.sleep(0) # Get the immediate next message which could be a status message # or a processable message. msg = None while True: # Check if already got enough at this point. if needed == 0: return msgs deadline = JetStreamContext._time_until(timeout, start_time) if len(msgs) == 0: # Not a single processable message has been received so far, # if this timed out then let the error be raised. try: msg = await self._sub.next_msg(timeout=deadline) except asyncio.TimeoutError: if got_any_response: raise FetchTimeoutError raise else: try: msg = await self._sub.next_msg(timeout=deadline) except asyncio.TimeoutError: # Ignore any timeout since already got at least a message. break if msg: status = JetStreamContext.is_status_msg(msg) if JetStreamContext._is_heartbeat(status): got_any_response = True continue if not status: needed -= 1 msgs.append(msg) break elif status == api.StatusCode.NO_MESSAGES or status: # If there is still time, try again to get the next message # or timeout. This could be due to concurrent uses of fetch # with the same inbox. break elif len(msgs) == 0: raise nats.js.errors.APIError.from_msg(msg) # Wait for the rest of the messages to be delivered to the internal pending queue. try: for _ in range(needed): deadline = JetStreamContext._time_until(timeout, start_time) if deadline is not None and deadline < 0: return msgs msg = await self._sub.next_msg(timeout=deadline) status = JetStreamContext.is_status_msg(msg) if JetStreamContext._is_heartbeat(status): got_any_response = True continue if JetStreamContext._is_processable_msg(status, msg): needed -= 1 msgs.append(msg) except asyncio.TimeoutError: # Ignore any timeout errors at this point since # at least one message has already arrived. pass if len(msgs) == 0 and got_any_response: raise FetchTimeoutError return msgs ###################### # # # KeyValue Context # # # ###################### async def key_value(self, bucket: str) -> KeyValue: if VALID_BUCKET_RE.match(bucket) is None: raise InvalidBucketNameError stream = KV_STREAM_TEMPLATE.format(bucket=bucket) try: si = await self.stream_info(stream) except NotFoundError: raise BucketNotFoundError if si.config.max_msgs_per_subject < 1: raise BadBucketError return KeyValue( name=bucket, stream=stream, pre=KV_PRE_TEMPLATE.format(bucket=bucket), js=self, direct=bool(si.config.allow_direct), ) async def create_key_value( self, config: Optional[api.KeyValueConfig] = None, **params, ) -> KeyValue: """ create_key_value takes an api.KeyValueConfig and creates a KV in JetStream. """ if config is None: config = api.KeyValueConfig(bucket=params["bucket"]) config = config.evolve(**params) if VALID_BUCKET_RE.match(config.bucket) is None: raise InvalidBucketNameError duplicate_window: float = 2 * 60 # 2 minutes if config.ttl and config.ttl < duplicate_window: duplicate_window = config.ttl if config.history > 64: raise nats.js.errors.KeyHistoryTooLargeError stream = api.StreamConfig( name=KV_STREAM_TEMPLATE.format(bucket=config.bucket), description=config.description, subjects=[f"$KV.{config.bucket}.>"], allow_direct=config.direct, allow_rollup_hdrs=True, allow_msg_ttl=True, deny_delete=True, discard=api.DiscardPolicy.NEW, duplicate_window=duplicate_window, max_age=config.ttl, max_bytes=config.max_bytes, max_consumers=-1, max_msg_size=config.max_value_size, max_msgs=-1, max_msgs_per_subject=config.history, num_replicas=config.replicas, storage=config.storage, republish=config.republish, ) si = await self.add_stream(stream) assert stream.name is not None return KeyValue( name=config.bucket, stream=stream.name, pre=KV_PRE_TEMPLATE.format(bucket=config.bucket), js=self, direct=bool(si.config.allow_direct), ) async def delete_key_value(self, bucket: str) -> bool: """ delete_key_value deletes a JetStream KeyValue store by destroying the associated stream. """ if VALID_BUCKET_RE.match(bucket) is None: raise InvalidBucketNameError stream = KV_STREAM_TEMPLATE.format(bucket=bucket) return await self.delete_stream(stream) ####################### # # # ObjectStore Context # # # ####################### async def object_store(self, bucket: str) -> ObjectStore: if VALID_BUCKET_RE.match(bucket) is None: raise InvalidBucketNameError stream = OBJ_STREAM_TEMPLATE.format(bucket=bucket) try: await self.stream_info(stream) except NotFoundError: raise BucketNotFoundError return ObjectStore( name=bucket, stream=stream, js=self, ) async def create_object_store( self, bucket: str = None, config: Optional[api.ObjectStoreConfig] = None, **params, ) -> ObjectStore: """ create_object_store takes an api.ObjectStoreConfig and creates a OBJ in JetStream. """ if config is None: config = api.ObjectStoreConfig(bucket=bucket) else: config.bucket = bucket config = config.evolve(**params) if VALID_BUCKET_RE.match(config.bucket) is None: raise InvalidBucketNameError name = config.bucket chunks = OBJ_ALL_CHUNKS_PRE_TEMPLATE.format(bucket=name) meta = OBJ_ALL_META_PRE_TEMPLATE.format(bucket=name) max_bytes = config.max_bytes if max_bytes == 0: max_bytes = -1 stream = api.StreamConfig( name=OBJ_STREAM_TEMPLATE.format(bucket=config.bucket), description=config.description, subjects=[chunks, meta], max_age=config.ttl, max_bytes=max_bytes, max_consumers=0, storage=config.storage, num_replicas=config.replicas, placement=config.placement, discard=api.DiscardPolicy.NEW, allow_rollup_hdrs=True, allow_direct=True, ) await self.add_stream(stream) assert stream.name is not None return ObjectStore( name=config.bucket, stream=stream.name, js=self, ) async def delete_object_store(self, bucket: str) -> bool: """ delete_object_store will delete the underlying stream for the named object. """ if VALID_BUCKET_RE.match(bucket) is None: raise InvalidBucketNameError stream = OBJ_STREAM_TEMPLATE.format(bucket=bucket) return await self.delete_stream(stream)
.
Edit
..
Edit
__init__.py
Edit
__pycache__
Edit
api.py
Edit
client.py
Edit
errors.py
Edit
kv.py
Edit
manager.py
Edit
object_store.py
Edit