REQUIRED
A Thetadata Stocks Pro Subscription is required to use this endpoint.
Full Trade Stream
Behavior
This stream returns every US Stock trade reported on the Nasdaq Basic feed. A quote (the last BBO) and ohlc message for the contract that was traded is sent before the trade occurs. The Theta Terminal will continue to receive these messages unless it is terminated or you unsubscribe from the full trade stream.
Subscribe to the Full Trade Stream
The id
field should be increased for each new stream request made. This ID is returned in a later message to verify that the request to stream trades was successful. This ID does not have any representation of contracts or unqiue streams. It only represents a way of tracking streaming requests made. Failure to increment the ID for each request will prevent the terminal from automatically resubscribing to streams you previously requested.
Payload
{
"msg_type": "STREAM_BULK",
"sec_type": "STOCK",
"req_type": "TRADE",
"add": true,
"id": 0
}
Sample Code
REQUIRED
The Theta Terminal must be running for this code to work.
import asyncio
import websockets
# This code has only been tested on Python 3.11. Other versions might require adjustments.
async def stream_trades():
async with websockets.connect('ws://127.0.0.1:25520/v1/events') as websocket:
req = {}
req['msg_type'] = 'STREAM_BULK'
req['sec_type'] = 'STOCK'
req['req_type'] = 'TRADE'
req['add'] = True
req['id'] = 0
await websocket.send(req.__str__())
while True:
response = await websocket.recv()
print(response)
asyncio.get_event_loop().run_until_complete(stream_trades())
package main
import (
"encoding/json"
"fmt"
"log"
"net/url"
"os"
"os/signal"
"syscall"
"github.com/gorilla/websocket"
)
type InitialMessage struct {
MsgType string `json:"msg_type"`
SecType string `json:"sec_type"`
ReqType string `json:"req_type"`
Add bool `json:"add"`
ID int `json:"id"`
}
func main() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
u := url.URL{Scheme: "ws", Host: "127.0.0.1:25520", Path: "/v1/events"}
log.Printf("connecting to %s", u.String())
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Fatal("dial:", err)
}
defer c.Close()
done := make(chan struct{})
go func() {
defer close(done)
for {
_, message, err := c.ReadMessage()
if err != nil {
log.Println("read:", err)
return
}
fmt.Printf("%s\n", message)
}
}()
initialMessage := InitialMessage{
MsgType: "STREAM_BULK",
SecType: "STOCK",
ReqType: "TRADE",
Add: true,
ID: 0,
}
msg, err := json.Marshal(initialMessage)
if err != nil {
log.Println("error in marshalling:", err)
return
}
err = c.WriteMessage(websocket.TextMessage, msg)
if err != nil {
log.Println("write:", err)
return
}
for {
select {
case <-done:
return
case <-interrupt:
log.Println("interrupt")
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Println("write close:", err)
return
}
<-done
return
}
}
}
Unsubscribe from the Full Trade Stream
Changing the add
field in the payload from true
to false
will end the full trade stream subscription.
{
"msg_type": "STREAM_BULK",
"sec_type": "STOCK",
"req_type": "TRADE",
"add": false,
"id": 1
}
Sample output
The condition and exchange values correspond to their respective Enums.
The strike price is in 1/10th of a cent. This means that a $140 strike price is represented as
140000
.The trade sequence article might be a valuable resource.
{
"header": {
"type": "TRADE",
"status": "CONNECTED"
},
"contract": {
"security_type": "STOCK",
"root": "QQQ"
},
"trade": {
"ms_of_day": 46843302,
"sequence": 30350622,
"size": 1,
"condition": 115,
"price": 461.535,
"exchange": 57,
"date": 20240801
}
}