Stream and decode reports via WebSocket using the SDK (Crypto Streams)

In this guide, you'll learn how to use Chainlink Data Streams with the Streams Direct implementation and the Data Streams SDK for Go to subscribe to real-time V3 reports for Crypto streams via a WebSocket connection. You'll set up your Go project, listen for real-time reports from the Data Streams Aggregation Network, decode the report data, and log their attributes to your terminal.

Requirements

  • Git: Make sure you have Git installed. You can check your current version by running git --version in your terminal and download the latest version from the official Git website if necessary.
  • Go Version: Make sure you have Go version 1.21 or higher. You can check your current version by running go version in your terminal and download the latest version from the official Go website if necessary.
  • API Credentials: Access to the Streams Direct implementation requires API credentials. If you haven't already, contact us to request mainnet or testnet early access.

Guide

Set up your Go project

  1. Create a new directory for your project and navigate to it:

    mkdir my-data-streams-project
    cd my-data-streams-project
    
  2. Initialize a new Go module:

    go mod init my-data-streams-project
    
  3. Install the Data Streams SDK:

    go get github.com/smartcontractkit/data-streams-sdk/go
    

Establish a WebSocket connection and listen for real-time reports

  1. Create a new Go file, stream.go, in your project directory:

    touch stream.go
    
  2. Insert the following code example and save your stream.go file:

     package main
    
     import (
       "context"
       "fmt"
       "os"
       "time"
    
       streams "github.com/smartcontractkit/data-streams-sdk/go"
       feed "github.com/smartcontractkit/data-streams-sdk/go/feed"
       report "github.com/smartcontractkit/data-streams-sdk/go/report"
       v3 "github.com/smartcontractkit/data-streams-sdk/go/report/v3" // Import the v3 report schema for Crypto streams. For RWA streams, use the v4 schema.
     )
    
     func main() {
       if len(os.Args) < 2 {
         fmt.Println("Usage: go run stream.go [StreamID1] [StreamID2] ...")
         os.Exit(1)
       }
    
       // Set up the SDK client configuration
       cfg := streams.Config{
         ApiKey:    os.Getenv("API_KEY"),
         ApiSecret: os.Getenv("API_SECRET"),
         WsURL: "wss://ws.testnet-dataengine.chain.link",
         Logger: streams.LogPrintf,
       }
    
       // Create a new client
       client, err := streams.New(cfg)
       if err != nil {
         cfg.Logger("Failed to create client: %v\n", err)
         os.Exit(1)
       }
    
       // Parse the feed IDs from the command line arguments
       var ids []feed.ID
       for _, arg := range os.Args[1:] {
         var fid feed.ID
         if err := fid.FromString(arg); err != nil {
           cfg.Logger("Invalid stream ID %s: %v\n", arg, err)
           os.Exit(1)
         }
         ids = append(ids, fid)
       }
    
       ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
       defer cancel()
    
       // Subscribe to the feeds
       stream, err := client.Stream(ctx, ids)
       if err != nil {
         cfg.Logger("Failed to subscribe: %v\n", err)
         os.Exit(1)
       }
    
       defer stream.Close()
         for {
             reportResponse, err := stream.Read(context.Background())
             if err != nil {
                 cfg.Logger("Error reading from stream: %v\n", err)
                 continue
             }
    
             // Log the contents of the report before decoding
             cfg.Logger("Raw report data: %+v\n", reportResponse)
    
                 // Decode each report as it comes in
                 decodedReport, decodeErr := report.Decode[v3.Data](reportResponse.FullReport)
                 if decodeErr != nil {
                     cfg.Logger("Failed to decode report: %v\n", decodeErr)
                     continue
                 }
    
             // Log the decoded report
             cfg.Logger("\n--- Report Stream ID: %s ---\n" +
               "------------------------------------------\n" +
               "Observations Timestamp : %d\n" +
               "Benchmark Price        : %s\n" +
               "Bid                    : %s\n" +
               "Ask                    : %s\n" +
               "Valid From Timestamp   : %d\n" +
               "Expires At             : %d\n" +
               "Link Fee               : %s\n" +
               "Native Fee             : %s\n" +
               "------------------------------------------\n",
               reportResponse.FeedID.String(),
               decodedReport.Data.ObservationsTimestamp,
               decodedReport.Data.BenchmarkPrice.String(),
               decodedReport.Data.Bid.String(),
               decodedReport.Data.Ask.String(),
               decodedReport.Data.ValidFromTimestamp,
               decodedReport.Data.ExpiresAt,
               decodedReport.Data.LinkFee.String(),
               decodedReport.Data.NativeFee.String(),
             )
    
             // Also, log the stream stats
             cfg.Logger("\n--- Stream Stats ---\n" +
             stream.Stats().String() + "\n" +
             "--------------------------------------------------------------------------------------------------------------------------------------------\n",
             )
         }
     }
    
  3. Download the required dependencies and update the go.mod and go.sum files:

    go mod tidy
    
  4. Set up the SDK client configuration within stream.go with your API credentials and the WebSocket URL:

    cfg := streams.Config{
        ApiKey:    os.Getenv("API_KEY"),
        ApiSecret: os.Getenv("API_SECRET"),
        WsURL: "wss://ws.testnet-dataengine.chain.link",
        Logger: streams.LogPrintf,
    }
    
    • Set your API credentials as environment variables:

      export API_KEY="<YOUR_API_KEY>"
      export API_SECRET="<YOUR_API_SECRET>"
      

      Replace <YOUR_API_KEY> and <YOUR_API_SECRET> with your API credentials.

    • WsURL is the WebSocket URL for the Data Streams Aggregation Network. Use wss://ws.testnet-dataengine.chain.link for the testnet environment.

    See the SDK Reference page for more configuration options.

  5. For this example, you'll subscribe to the ETH/USD Data Streams crypto stream. This stream ID is 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782. See the Crypto Streams page for a complete list of available crypto assets.

    Execute your application:

    go run stream.go 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782
    

    Expect output similar to the following in your terminal:

     2024-07-31T15:34:27-05:00 Raw report data: {"fullReport":"0x0006f9b553e393ced311551efd30d1decedb63d76ad41737462e2cdbbdff15780000000000000000000000000000000000000000000000000000000035252f11000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e00000000000000000000000000000000000000000000000000000000000000220000000000000000000000000000000000000000000000000000000000000028000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000120000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba7820000000000000000000000000000000000000000000000000000000066aa9fd30000000000000000000000000000000000000000000000000000000066aa9fd300000000000000000000000000000000000000000000000000001c23cdce76d0000000000000000000000000000000000000000000000000001ba0a27c8b79d40000000000000000000000000000000000000000000000000000000066abf1530000000000000000000000000000000000000000000000af35b91cbc421fe2800000000000000000000000000000000000000000000000af354910dbd1830c200000000000000000000000000000000000000000000000af3629289cb2be3f800000000000000000000000000000000000000000000000000000000000000002e03c8b14707a80c59922eeb6b89c79dd8ac6b4b925203b3fe2f0903ba6765934aaf6c4170522c0e54abecb90e7ba7b26e27a83b12740e6a6fd5835c5ece9034c000000000000000000000000000000000000000000000000000000000000000252088e89df570d7022fd2bfc71eb53bfe72423ccba1834a785d80c278b334fab65d4acced307504358554844c2007ab0322b7ab2b7bfa2bc39563bf823654a36","feedID":"0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782","validFromTimestamp":1722458067,"observationsTimestamp":1722458067}
    
     2024-07-31T15:34:27-05:00
     --- Report Stream ID: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782 ---
     ------------------------------------------
     Observations Timestamp : 1722458067
     Benchmark Price        : 3232051369848762000000
     Bid                    : 3232019831592780500000
     Ask                    : 3232082908104743600000
     Valid From Timestamp   : 1722458067
     Expires At             : 1722544467
     Link Fee               : 7776444105849300
     Native Fee             : 30940102293200
     ------------------------------------------
    
     2024-07-31T15:34:27-05:00
     --- Stream Stats ---
     accepted: 1, deduplicated: 0, total_received 1, partial_reconnects: 0, full_reconnects: 0, configured_connections: 1, active_connections 1
     --------------------------------------------------------------------------------------------------------------------------------------------
    
     2024-07-31T15:34:28-05:00 Raw report data: {"fullReport":"0x0006f9b553e393ced311551efd30d1decedb63d76ad41737462e2cdbbdff15780000000000000000000000000000000000000000000000000000000035252f14000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e00000000000000000000000000000000000000000000000000000000000000220000000000000000000000000000000000000000000000000000000000000028000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000120000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba7820000000000000000000000000000000000000000000000000000000066aa9fd40000000000000000000000000000000000000000000000000000000066aa9fd400000000000000000000000000000000000000000000000000001c23c416de34000000000000000000000000000000000000000000000000001ba0909c32d3c00000000000000000000000000000000000000000000000000000000066abf1540000000000000000000000000000000000000000000000af35f59d91552300000000000000000000000000000000000000000000000000af34696c66686640800000000000000000000000000000000000000000000000af3c6a5680c2a6000000000000000000000000000000000000000000000000000000000000000000020d3c5953a51793330c4bb5082d0e82eca98281d340d56088b5707dbc77e5c106311585b943ced71c62a3e6b100dc9316c3580354aee92626280228dd9b6a2c3900000000000000000000000000000000000000000000000000000000000000026398ed0026b877ba17280888f1c7f93f42ca4c3148cf33761412af03b19c08880e4ee75f222eb928db5429fc4339aa1e275bf5a5ffeb6345aa0acef594024abc","feedID":"0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782","validFromTimestamp":1722458068,"observationsTimestamp":1722458068}
    
     2024-07-31T15:34:28-05:00
     --- Report Stream ID: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782 ---
     ------------------------------------------
     Observations Timestamp : 1722458068
     Benchmark Price        : 3232068400000000000000
     Bid                    : 3231956881848792400000
     Ask                    : 3232533600000000000000
     Valid From Timestamp   : 1722458068
     Expires At             : 1722544468
     Link Fee               : 7776367327499200
     Native Fee             : 30939939266100
     ------------------------------------------
    
     2024-07-31T15:34:28-05:00
     --- Stream Stats ---
     accepted: 2, deduplicated: 0, total_received 2, partial_reconnects: 0, full_reconnects: 0, configured_connections: 1, active_connections 1
     --------------------------------------------------------------------------------------------------------------------------------------------
    
     2024-07-31T15:34:29-05:00 Raw report data: {"fullReport":"0x0006f9b553e393ced311551efd30d1decedb63d76ad41737462e2cdbbdff15780000000000000000000000000000000000000000000000000000000035252f19000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e00000000000000000000000000000000000000000000000000000000000000220000000000000000000000000000000000000000000000000000000000000028000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000120000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba7820000000000000000000000000000000000000000000000000000000066aa9fd50000000000000000000000000000000000000000000000000000000066aa9fd500000000000000000000000000000000000000000000000000001c2232164340000000000000000000000000000000000000000000000000001ba02d9e17e83c0000000000000000000000000000000000000000000000000000000066abf1550000000000000000000000000000000000000000000000af3fbd367bea5ac0000000000000000000000000000000000000000000000000af3f1f78eb5653c0000000000000000000000000000000000000000000000000af405a99196de7800000000000000000000000000000000000000000000000000000000000000000020a7b2c4de654a6fb2e0b9c3706521a94bb852c705fe03e682da43301986c229f99b40a47c34b2d23c51e6323274d68b5c6d0d36dbc02586233d50dfc3ef193700000000000000000000000000000000000000000000000000000000000000002364b1b5d922cfe20faa94011a22324ed452fe17a0c1d1475a468974a39419aae33a027865c4a2738fbd59f2ce3a1c72435054a72084b4802f205fe7a690d1ecc","feedID":"0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782","validFromTimestamp":1722458069,"observationsTimestamp":1722458069}
    
     2024-07-31T15:34:29-05:00
     --- Report Stream ID: 0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782 ---
     ------------------------------------------
     Observations Timestamp : 1722458069
     Benchmark Price        : 3232773100000000000000
     Bid                    : 3232728700000000000000
     Ask                    : 3232817400000000000000
     Valid From Timestamp   : 1722458069
     Expires At             : 1722544469
     Link Fee               : 7775942157527100
     Native Fee             : 30933194785600
     ------------------------------------------
    
     2024-07-31T15:34:29-05:00
     --- Stream Stats ---
     accepted: 3, deduplicated: 0, total_received 3, partial_reconnects: 0, full_reconnects: 0, configured_connections: 1, active_connections 1
     --------------------------------------------------------------------------------------------------------------------------------------------
    
    [...]
    

Decoded report details

The decoded report details include:

AttributeValueDescription
Stream ID0x000359843a543ee2fe414dc14c7e7920ef10f4372990b79d6361cdc0dd1ba782The unique identifier for the stream. In this example, the stream is for ETH/USD.
Observations Timestamp1722458069The timestamp indicating when the data was captured.
Benchmark Price3232773100000000000000The observed price in the report, with 18 decimals. For readability: 3,232.7731000000000 USD per ETH.
Bid3232728700000000000000The highest price a buyer is willing to pay for an asset, with 18 decimals. For readability: 3,232.7287000000000 USD per ETH. Learn more about the Bid price.
Ask3232817400000000000000The lowest price a seller is willing to accept for an asset, with 18 decimals. For readability: 3,232.8174000000000 USD per ETH. Learn more about the Ask price.
Valid From Timestamp1722458069The start validity timestamp for the report, indicating when the data becomes relevant.
Expires At1722544469The expiration timestamp of the report, indicating the point at which the data becomes outdated.
Link Fee7775942157527100The fee to pay in LINK tokens for the onchain verification of the report data. With 18 decimals. For readability: 0.0077759421575271 LINK.
Native Fee30933194785600The fee to pay in the native blockchain token (e.g., ETH on Ethereum) for the onchain verification of the report data. With 18 decimals. Note: This example fee is not indicative of actual fees.

Payload for onchain verification

In this guide, you log and decode the fullReport payload to extract the report data. In a production environment, you should verify the data onchain to ensure its integrity and authenticity. Refer to the Verify report data onchain guide.

Explanation

Establishing a WebSocket connection and listening for reports

Your application uses the Stream function in the Data Streams SDK's client package to establish a real-time WebSocket connection with the Data Streams Aggregation Network.

Once the WebSocket connection is established, your application subscribes to one or more streams by passing an array of feed.IDs to the Stream function. This subscription lets the client receive real-time updates whenever new report data is available for the specified streams.

Decoding a report

As data reports arrive via the established WebSocket connection, they are processed in real-time:

  • Reading streams: The Read method on the returned Stream object is continuously called within a loop. This method blocks until new data is available, ensuring that all incoming reports are captured as soon as they are broadcasted.

  • Decoding reports: For each received report, the SDK's Decode function parses and transforms the raw data into a structured format (v3.Data for crypto streams). This decoded data includes data such as the benchmark price and bid and ask prices.

Handling the decoded data

In this example, the application logs the structured report data to the terminal. However, this data can be used for further processing, analysis, or display in your own application.

What's next

Get the latest Chainlink content straight to your inbox.