Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement TransformStream #3919

Open
2 tasks
Tracked by #3920
oleiade opened this issue Aug 28, 2024 · 0 comments
Open
2 tasks
Tracked by #3920

Implement TransformStream #3919

oleiade opened this issue Aug 28, 2024 · 0 comments
Labels

Comments

@oleiade
Copy link
Member

oleiade commented Aug 28, 2024

Feature Description

Description

As part of our ongoing effort to enhance k6’s data handling capabilities, we propose the implementation of TransformStream support within the k6/experimental/streams module. TransformStream is a Streams API construct that provides a straightforward way to transform data between the reading and writing stages of a stream. This addition would allow users to create more convenient processing pipelines directly within their testing scripts.

Currently, k6 supports ReadableStream, enabling efficient data consumption. However, without TransformStream, users cannot process or modify data on-the-fly as it flows through these streams. By implementing TransformStream, we enable real-time data transformation tasks such as text decoding, line parsing, encryption, and format conversion.

Practical Benefits:

  • In-Flight Data Transformation: TransformStream allows for real-time data transformation as it moves through the stream, reducing latency and memory overhead compared to batch processing.
  • Enhanced Stream Pipelines: This feature enables the chaining of multiple transformations, facilitating complex processing workflows such as decoding, parsing, and filtering data in a single pipeline.
  • Modularity and Reusability: Transformation logic can be encapsulated within TransformStream, promoting code reuse and composability across different parts of a k6 script.
  • Low Memory Footprint: By processing data chunk-by-chunk, TransformStream minimizes the need to load large amounts of data into memory simultaneously.
  • Versatile Use Cases: Supports a wide array of data manipulation needs, from text processing and compression to encryption and custom data formats.

Usage

In this example, we create a ReadableStream from a text file, use a TransformStream to decode the text from binary format, and then use another TransformStream to split the text into individual lines.

Note that k6 could actually already provide such transform stream 👇 implementations for convenience down the road, like Deno does.

// Step 1: Create a readable stream over a text file
const fileStream = getReadableStreamSomehow(); // Placeholder for actual file stream creation logic

// Step 2: Create a TransformStream to decode binary chunks into text using TextDecoder
const textDecoderStream = new TransformStream({
  start() {
    this.decoder = new TextDecoder('utf-8');
  },

  transform(chunk, controller) {
    // Decode binary chunk to text and pass it down the stream
    controller.enqueue(this.decoder.decode(chunk, { stream: true }));
  },

  flush(controller) {
    // Ensure any remaining text is pushed at the end
    controller.enqueue(this.decoder.decode());
  },
});

// Step 3: Create a TransformStream to split text into individual lines
const lineSplitterStream = new TransformStream({
  start() {
    this.buffer = '';
  },

  transform(chunk, controller) {
    // Buffer incoming text and split into lines
    this.buffer += chunk;
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop(); // Keep the last partial line in buffer
    lines.forEach(line => controller.enqueue(line));
  },

  flush(controller) {
    // Enqueue any remaining line in the buffer when the stream ends
    if (this.buffer) {
      controller.enqueue(this.buffer);
    }
  },
});

// Step 4: Pipe the file stream through the transform streams
fileStream
  .pipeThrough(textDecoderStream)
  .pipeThrough(lineSplitterStream)
  .getReader()
  .read()
  .then(({ done, value }) => {
    if (!done) {
      console.log('Line:', value); // Process the line
    }
  });

Definition of Done

  • TransformStream is exported by the k6/experimental/streams module
  • The ReadableStream.pipeThrough method is implemented

Suggested Solution (optional)

No response

Already existing or connected issues / PRs (optional)

#2974
#2978
#3918

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1 participant