The goal of SSEparser is to provide robust functionality to parse Server-Sent Events and to build on top of it.
You can install SEEparser
from CRAN like so:
install.packages("gptstudio")
Alternatively, you can install the development version like so:
::pak("calderonsamuel/SSEparser") pak
The parse_sse()
function takes a string containing a
server-sent event and converts it to a R list.
library(SSEparser)
<- "data: test\nevent: message\nid: 123\n\n"
event
parse_sse(event)
#> [[1]]
#> [[1]]$data
#> [1] "test"
#>
#> [[1]]$event
#> [1] "message"
#>
#> [[1]]$id
#> [1] "123"
Comments are usually received in a line starting with a colon. They are not parsed.
<- "data: test\n: comment\nevent: example\n\n"
with_comment
parse_sse(with_comment)
#> [[1]]
#> [[1]]$data
#> [1] "test"
#>
#> [[1]]$event
#> [1] "example"
parse_sse()
wraps the SSEparser
R6 class,
which is also exported to be used with real-time streaming data. The
following code handles a request with MIME type “text/event-stream”.
<- SSEparser$new()
parser <- httr2::request("https://postman-echo.com/server-events/3") %>%
response ::req_body_json(data = list(
httr2event = "message",
request = "POST"
%>%
)) ::req_perform_stream(callback = \(x) {
httr2<- rawToChar(x)
event $parse_sse(event)
parserTRUE
})
str(parser$events)
#> List of 3
#> $ :List of 3
#> ..$ event: chr "message"
#> ..$ data : chr "{\"event\":\"message\",\"request\":\"POST\"}"
#> ..$ id : chr "1"
#> $ :List of 3
#> ..$ event: chr "message"
#> ..$ data : chr "{\"event\":\"message\",\"request\":\"POST\"}"
#> ..$ id : chr "2"
#> $ :List of 3
#> ..$ event: chr "error"
#> ..$ data : chr "{\"event\":\"message\",\"request\":\"POST\"}"
#> ..$ id : chr "3"
Following the previous example, it should be useful to parse the
content of every data
field to be also an R list instead of
a JSON string. For that, we can create a new R6 class which inherits
from SSEparser
. We just need to overwrite the
append_parsed_sse()
method.
<- R6::R6Class(
CustomParser classname = "CustomParser",
inherit = SSEparser,
public = list(
initialize = function() {
$initialize()
super
},append_parsed_sse = function(parsed_event) {
$data <- jsonlite::fromJSON(parsed_event$data)
parsed_event$events = c(self$events, list(parsed_event))
selfinvisible(self)
}
) )
Notice that the only thing we are modifying is the parsing of the
data field, not the parsing of the event itself. This is the original
method from SSEparser
:
$public_methods$append_parsed_sse
SSEparser#> function (parsed_event)
#> {
#> self$events <- c(self$events, list(parsed_event))
#> invisible(self)
#> }
#> <bytecode: 0x0000022528581ce8>
#> <environment: namespace:SSEparser>
CustomParser
uses jsonlite::fromJSON()
to
parse the data field of every chunk in the event stream. We can now use
our custom class with the previous request1.
<- CustomParser$new()
parser <- httr2::request("https://postman-echo.com/server-events/3") %>%
response ::req_body_json(data = list(
httr2event = "message",
request = "POST"
%>%
)) ::req_perform_stream(callback = \(x) {
httr2<- rawToChar(x)
event $parse_sse(event)
parserTRUE
})
str(parser$events)
#> List of 3
#> $ :List of 3
#> ..$ event: chr "message"
#> ..$ data :List of 2
#> .. ..$ event : chr "message"
#> .. ..$ request: chr "POST"
#> ..$ id : chr "1"
#> $ :List of 3
#> ..$ event: chr "message"
#> ..$ data :List of 2
#> .. ..$ event : chr "message"
#> .. ..$ request: chr "POST"
#> ..$ id : chr "2"
#> $ :List of 3
#> ..$ event: chr "ping"
#> ..$ data :List of 2
#> .. ..$ event : chr "message"
#> .. ..$ request: chr "POST"
#> ..$ id : chr "3"
Now instead of a JSON string we can have an R list in the data field while the stream is still in process.
This endpoint returns random event field names for each chunk in every request, so the response will not be exactly the same.↩︎