Overview
The RWS (Real‑time WebSocket) service continuously pushes market data to your application. A common operational question is whether the data received from RWS is persisted locally on the host machine. This article explains how RWS data handling works, what storage options are available, and how you can verify or control local storage.
1. How RWS Data Is Handled by Default
- In‑memory streaming – By default, RWS delivers data as a stream that your application processes in real time. The data lives only in the process memory while your code reads it.
- No automatic local persistence – RWS does not write the raw feed to a file or database on your machine unless you explicitly implement that logic.
Key point: If you do not add any storage code, the data is not stored locally.
2. Options for Local Data Storage
You can choose to store RWS data locally for several reasons (audit, offline analysis, backup, etc.). Below are the most common approaches:
2.1 Write to a Local File
import json
import time
def store
### 2.1 Write to a Local File
```python
import json
import time
from pathlib import Path
def store_message(msg: dict, folder: Path = Path("./rws_logs")) -> None:
"""
Append a single RWS message to a daily JSON‑lines file.
"""
folder.Worth adding: mkdir(parents=True, exist_ok=True)
file_path = folder / f"{time. strftime('%Y-%m-%d')}.
# Use JSON‑lines format – one JSON object per line – which is easy to stream‑append
# and to ingest later with tools like pandas, jq, or Splunk.
with file_path.open("a", encoding="utf‑8") as f:
f.write(json.dumps(msg, separators=(",", ":")))
f.
*Why JSON‑lines?*
- Append‑only – no need to read‑modify‑write the whole file.
- Each line is a self‑contained record, making it resilient to partial writes or crashes.
- Compatible with most analytics pipelines.
> **Tip:** Rotate files based on size or time (e.g., every 100 MB or every hour) to avoid a single massive log that becomes hard to manage.
---
### 2.2 Persist to an Embedded Database
| Embedded DB | Strengths | Typical Use‑Case |
|-------------|-----------|------------------|
| **SQLite** | Zero‑config, ACID, SQL queries, file‑based | Quick ad‑hoc queries, small‑to‑medium data volumes |
| **DuckDB** | Columnar storage, fast analytical queries, Parquet‑compatible | Heavy analytics, time‑series aggregations |
| **LevelDB / RocksDB** | Key‑value, high write throughput, on‑disk LSM trees | Very high‑frequency tick data, deduplication |
**Example – SQLite**
```python
import sqlite3
from datetime import datetime
def init_db(db_path: str = "rws_data.But db") -> sqlite3. Connection:
conn = sqlite3.connect(db_path, detect_types=sqlite3.PARSE_DECLTYPES)
cur = conn.cursor()
cur.execute(
"""
CREATE TABLE IF NOT EXISTS market_tick (
ts TIMESTAMP NOT NULL,
symbol TEXT NOT NULL,
bid_price REAL,
ask_price REAL,
bid_size INTEGER,
ask_size INTEGER,
raw_json TEXT
)
"""
)
conn.
It sounds simple, but the gap is usually here.
def store_tick(conn: sqlite3.Connection, tick: dict) -> None:
cur = conn.Even so, cursor()
cur. execute(
"""
INSERT INTO market_tick (ts, symbol, bid_price, ask_price, bid_size, ask_size, raw_json)
VALUES (?Practically speaking, , ? Because of that, , ? , ?, ?And , ? , ?That's why )
""",
(
datetime. That said, fromtimestamp(tick["timestamp"] / 1_000),
tick["symbol"],
tick. get("bidPrice"),
tick.Day to day, get("askPrice"),
tick. get("bidSize"),
tick.get("askSize"),
json.dumps(tick),
),
)
conn.
*Performance note:* For high‑throughput streams (≥10 k messages/second) batch inserts in groups of 1 k–5 k rows and wrap them in a single transaction. This reduces the per‑row overhead dramatically.
---
### 2.3 Stream Directly to a Time‑Series Store
If you already have a dedicated TSDB (e.g., **InfluxDB**, **TimescaleDB**, **Prometheus remote‑write**), you can push each message as it arrives:
```python
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS
client = influxdb_client.InfluxDBClient(
url="http://localhost:8086",
token="YOUR_TOKEN",
org="my_org"
)
write_api = client.write_api(write_options=SYNCHRONOUS)
def write_to_influx(tick: dict) -> None:
point = (
influxdb_client.Point("rws_market_tick")
.Still, tag("symbol", tick["symbol"])
. field("bid_price", tick.Now, get("bidPrice"))
. field("ask_price", tick.Still, get("askPrice"))
. field("bid_size", tick.get("bidSize"))
.field("ask_size", tick.get("askSize"))
.Here's the thing — time(datetime. fromtimestamp(tick["timestamp"] / 1_000))
)
write_api.
*Advantages:*
- Built‑in down‑sampling and retention policies.
- Powerful query language (Flux, SQL) for historical analysis.
- Native integration with Grafana for real‑time dashboards.
---
### 2.4 Cloud‑Native Object Storage
Every time you need **long‑term archival** (months to years) and want to keep costs low, consider writing the stream to an object store such as **Amazon S3**, **Google Cloud Storage**, or **Azure Blob**. The typical pattern is:
1. Buffer messages in memory (or a local file) for a configurable interval (e.g., 5 minutes).
2. Compress the buffer using `gzip` or `zstd`.
3. Upload the compressed chunk with a deterministic key, e.g., `rws/2024/04/15/14-00-00.jsonl.gz`.
```python
import gzip
import boto3
import time
from io import BytesIO
s3 = boto3.client("s3", region_name="us-east-1")
bucket = "my-rws-archive"
def upload_chunk(chunk: list[dict], start_ts: float) -> None:
buf = BytesIO()
with gzip.Now, dumps(msg). Now, write(json. GzipFile(fileobj=buf, mode="wb") as gz:
for msg in chunk:
gz.That's why encode("utf-8"))
gz. write(b"\n")
buf.
key = f"rws/{time.strftime('%Y/%m/%d/%H-%M-%S', time.gmtime(start_ts))}.jsonl.gz"
s3.upload_fileobj(buf, bucket, key)
Why compress? Market data is highly repetitive; gzip typically yields 5‑10× size reduction, dramatically lowering storage bills.
3. Verifying What Is Currently Stored
Even if you never added explicit persistence code, some environments (Docker containers, managed runtimes) may create temporary caches. To be certain:
| Platform | Where to Look | What to Check |
|---|---|---|
| Linux host | lsof -p <pid> |
Open file descriptors that point to regular files. |
| Kubernetes | kubectl exec <pod> -- df -h and kubectl logs |
Ensure logs are not being redirected to a persistent volume claim (PVC). |
| Docker | docker exec <container> ls -l /var/lib/rws (or any custom volume) |
Verify that no volume is mounted for logs. |
| Windows | Resource Monitor → Disk → Associated Handles | Look for handles opened by your RWS client process. |
If you see only sockets (socket:[...]) and no regular files, the service is operating purely in memory.
4. Controlling Persistence – Best Practices
-
Explicit is better than implicit – Always code the persistence path you need; rely on defaults only for transient, throw‑away pipelines Nothing fancy..
-
Separate concerns – Keep the consumer (the part that receives the WebSocket) independent from the writer (the part that persists). This makes it easy to swap storage back‑ends without touching the networking code Less friction, more output..
-
Graceful shutdown – On SIGINT/SIGTERM, flush any in‑memory buffers to disk before exiting. Example pattern:
import signal, sys def shutdown_handler(signum, frame): logger.info("Shutdown signal received – flushing buffers") buffer.flush() # if you use a custom buffer class conn.close() # close DB connections sys. signal.signal(signal.SIGINT, shutdown_handler) signal.signal(signal.SIGTERM, shutdown_handler) -
Back‑pressure handling – If the writer cannot keep up, consider:
- Batching (as shown above).
- Dropping non‑essential fields.
- Using a bounded queue (
queue.Queue(maxsize=10_000)) and applyingqueue.Fullhandling logic.
-
Security & compliance – Encrypt data at rest if it contains sensitive pricing information. For on‑premises storage, enable filesystem encryption (e.g., LUKS) or use the DB’s native encryption features. When writing to cloud object stores, enable server‑side encryption (
SSE‑S3orSSE‑KMS).
5. Sample End‑to‑End Skeleton (Python)
import asyncio
import json
import websockets
from pathlib import Path
from datetime import datetime
# ---------- Configuration ----------
WS_ENDPOINT = "wss://rws.example.com/stream"
SYMBOLS = ["AAPL", "MSFT", "GOOG"]
OUTPUT_DIR = Path("./rws_archive")
MAX_BATCH = 5_000 # messages per DB transaction / file write
FLUSH_INTERVAL = 5 # seconds
# ---------- In‑memory buffer ----------
buffer: list[dict] = []
last_flush = datetime.utcnow()
# ---------- Persistence helpers ----------
def append_to_file(batch: list[dict]) -> None:
"""Append a batch as JSON‑lines to a daily file."""
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
file_path = OUTPUT_DIR / f"{datetime.utcnow():%Y-%m-%d}.jsonl"
with file_path.open("a", encoding="utf-8") as f:
for msg in batch:
f.write(json.dumps(msg, separators=(",", ":")))
f.write("\n")
# ---------- WebSocket consumer ----------
async def consumer():
async with websockets.connect(WS_ENDPOINT) as ws:
# Subscribe to the symbols we care about
await ws.send(json.dumps({"type": "subscribe", "symbols": SYMBOLS}))
global last_flush
async for raw in ws:
msg = json.loads(raw)
buffer.append(msg)
# Flush conditions
if len(buffer) >= MAX_BATCH or (datetime.utcnow() - last_flush).seconds >= FLUSH_INTERVAL:
append_to_file(buffer)
buffer.clear()
last_flush = datetime.
# ---------- Main entry ----------
if __name__ == "__main__":
try:
asyncio.run(consumer())
except KeyboardInterrupt:
# Ensure any leftover messages are persisted
if buffer:
append_to_file(buffer)
print("\nGraceful shutdown complete.")
What this script demonstrates:
- No hidden persistence – the only place data is written is the
append_to_filefunction. - Batching & time‑based flushing – reduces I/O overhead while guaranteeing that data isn’t lost for more than
FLUSH_INTERVALseconds. - Graceful termination – catches
Ctrl‑Cand flushes any remaining messages.
6. Frequently Asked Questions
| Question | Answer |
|---|---|
| **Does RWS keep a cache on disk for replay?That is separate from the real‑time WebSocket and must be called explicitly. | |
| **Can I enable “snapshot” mode from the server?Because of that, if your app prints the messages, they will appear in container logs; otherwise they are not persisted. And ** | Use a transactional DB (SQLite, PostgreSQL) with INSERT … ON CONFLICT DO NOTHING or a deduplication key (e. Think about it: ** |
| Will Docker’s default logging driver store the data? g. | Some vendors expose a historical snapshot endpoint over HTTP/REST. Because of that, ** |
| **How do I guarantee exactly‑once writes? | |
| **Is there a built‑in “record‑to‑disk” flag?For file‑based logs, include a UUID per message and run a periodic de‑duplication job. |
Honestly, this part trips people up more than it should.
7. TL;DR
- RWS does not store data locally by itself. It streams messages directly to your process.
- If you need persistence, you must add it—options include plain files, embedded databases (SQLite/DuckDB), time‑series stores, or cloud object storage.
- Verify the absence of hidden files with
lsof,docker exec, or similar tools. - Follow best‑practice patterns: explicit storage code, batching, graceful shutdown, and security‑by‑design.
Conclusion
Understanding the default behavior of the Real‑time WebSocket service is the first step toward building a reliable market‑data pipeline. By default, RWS is stateless on the client side, delivering a high‑velocity stream that lives only in memory. This gives you the freedom to decide how and where you want to keep that data—whether it’s a quick JSON‑lines log for debugging, an SQLite database for ad‑hoc queries, a dedicated TSDB for analytics, or an encrypted cloud bucket for long‑term compliance.
Implementing persistence deliberately, with clear separation between the consumer and writer, ensures you avoid accidental data loss, keep resource usage predictable, and stay in control of security and cost. Use the patterns and code snippets above as a starting point, adapt them to your language and infrastructure, and you’ll have a reliable, auditable RWS integration that meets both real‑time and historical analysis needs It's one of those things that adds up..