Stream live microphone audio to Deepgram for real-time cloud transcription using decibri and the official Deepgram SDK. This page covers two Deepgram streaming products, Nova-3 and Flux, in both Python and Node.js. Tabs above each code block switch between languages.
This integration captures live audio from your microphone using decibri and streams it to Deepgram's cloud API over a WebSocket. Two products are documented here: Nova-3 (single-utterance general transcription) and Flux (multi-turn voice-agent transcription). Both ship in the same SDK packages; pick the one that matches your use case. There is no model download, no local inference, and no format conversion required.
Deepgram offers two streaming speech-to-text products. Both ship in the same SDK packages and both stream live microphone audio over a WebSocket, but they target different shapes of speech and emit different result events.
Nova-3 is a general-purpose, single-utterance transcription model on the v1 streaming API. It emits Results events as audio is processed, with an is_final flag separating progressive partials from final transcripts. Pick Nova-3 for dictation, voice notes, captioning, or any flow where the goal is to transcribe one utterance at a time.
Flux is a multi-turn, voice-agent oriented model on the v2 streaming API. It emits EndOfTurn events with a turn index, the transcript for that turn, and an end-of-turn confidence score. Turn boundaries are configurable via eot_threshold and eot_timeout_ms. Pick Flux for conversational interfaces, voice agents, or any flow where the application needs to react to natural turn-taking.
The walkthroughs below cover both. The Prerequisites section is shared; the Nova-3 and Flux sections are independent and either can be copied on its own.
.env file in your project root:DEEPGRAM_API_KEY=your_key_here
The dotenv package (python-dotenv on PyPI, dotenv on npm) loads your API key from the .env file. If you set environment variables another way, you can skip it. The install command above switches with the language tabs in the code blocks below.
The same Deepgram SDK package supports both Nova-3 and Flux. No model download is required. All processing happens in Deepgram's cloud.
Import decibri, the Deepgram SDK, and dotenv. Create a DeepgramClient with your API key. The same DeepgramClient is used for both products; the product is selected at connect time.
import os
from dotenv import load_dotenv
import decibri
from deepgram import DeepgramClient
from deepgram.core.events import EventType
load_dotenv()
client = DeepgramClient(api_key=os.environ["DEEPGRAM_API_KEY"])
require('dotenv').config();
const { Microphone } = require('decibri');
const { DeepgramClient } = require('@deepgram/sdk');
const client = new DeepgramClient({ apiKey: process.env.DEEPGRAM_API_KEY });
Open a WebSocket to Deepgram and select the Nova-3 model. The audio parameters (encoding, sample_rate) must match what decibri emits.
In Python, client.listen.v1.connect(...) returns a context manager; use with ... as connection: so the WebSocket is closed cleanly when the block exits. The remaining Python steps in this walkthrough sit inside that with block.
In Node, client.listen.v1.connect(...) returns a promise that resolves to a socket. The socket is built with startClosed: true, so you must call socket.connect() before awaiting socket.waitForOpen(). Without that call, waitForOpen() blocks indefinitely. Wrap waitForOpen() in a Promise.race with a timeout because the SDK's ReconnectingWebSocket retries silently on auth or network failures and would otherwise hang quietly.
Authorization: `Token ${KEY}`. The DeepgramClient constructor's apiKey option only flows to REST endpoints; the WebSocket transport ignores it. Omitting the Authorization header in the Node connect args is the single most common reason a Deepgram WebSocket fails to open. The Python SDK wires the constructor's api_key through to the WebSocket transport automatically, so the Python connect args do not need an Authorization entry.
with client.listen.v1.connect(
model="nova-3",
encoding="linear16",
sample_rate=16000,
) as connection:
# Register handlers, open the microphone, stream audio, and shut
# down inside this block (see steps below).
...
const KEY = process.env.DEEPGRAM_API_KEY;
const socket = await client.listen.v1.connect({
model: 'nova-3',
encoding: 'linear16',
sample_rate: 16000,
Authorization: `Token ${KEY}`,
});
socket.connect();
await Promise.race([
socket.waitForOpen(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Deepgram WebSocket open timeout after 10s')), 10000)
),
]);
Register handlers before streaming audio. Both SDKs deliver each result as one event. In Python, events use the EventType enum from deepgram.core.events; on v1, message payloads are Pydantic ListenV1Results objects accessed with attribute syntax. In Node, the socket exposes string events ('open', 'message', 'error', 'close'); message payloads are plain JS objects with direct property access.
def on_open(_):
print("Deepgram connected.")
def on_message(result):
if result.type != "Results" or not result.is_final:
return
alts = result.channel.alternatives if result.channel else []
if not alts:
return
transcript = alts[0].transcript
if transcript:
print(transcript)
connection.on(EventType.OPEN, on_open)
connection.on(EventType.MESSAGE, on_message)
socket.on('open', () => {
console.log('Deepgram connected.');
});
socket.on('message', (msg) => {
if (msg && msg.type === 'Results' && msg.is_final) {
const alts = (msg.channel && msg.channel.alternatives) || [];
const transcript = alts[0] && alts[0].transcript;
if (transcript) {
console.log(transcript);
}
}
});
socket.on('error', (err) => {
console.error('Deepgram error:', err);
});
socket.on('close', () => {
console.log('Connection closed.');
});
Create a decibri instance at 16 kHz mono. Decibri's default 16-bit signed integer PCM matches Deepgram's linear16 encoding directly. In Python, Microphone is constructed here but capture starts only when the with mic: block is entered (see the shutdown step below). In Node, new Microphone(...) begins capture as soon as it is constructed.
mic = decibri.Microphone(sample_rate=16000, channels=1)
const mic = new Microphone({ sampleRate: 16000, channels: 1 });
Read chunks from decibri and forward them to Deepgram. Deepgram recommends streaming buffers between 20 and 100 ms; batching to 100 ms windows (3200 bytes of int16 mono at 16 kHz) gives the most efficient server-side processing. Decibri's live microphone capture delivers audio at real-time pace, and Deepgram processes it as it arrives. Decibri produces raw int16 PCM that Deepgram accepts as-is, so no format conversion is needed.
BATCH_BYTES = 3200 # 100 ms of int16 mono at 16 kHz
MIN_BYTES = 1600 # 50 ms tail-drop
def audio_iter():
buffer = bytearray()
for chunk in mic:
buffer.extend(chunk)
while len(buffer) >= BATCH_BYTES:
yield bytes(buffer[:BATCH_BYTES])
del buffer[:BATCH_BYTES]
if len(buffer) >= MIN_BYTES:
yield bytes(buffer)
const BATCH_BYTES = 3200; // 100 ms of int16 mono at 16 kHz
const MIN_BYTES = 1600; // 50 ms tail-drop
let buffer = Buffer.alloc(0);
mic.on('data', (chunk) => {
buffer = Buffer.concat([buffer, chunk]);
while (buffer.length >= BATCH_BYTES) {
socket.sendMedia(buffer.subarray(0, BATCH_BYTES));
buffer = buffer.subarray(BATCH_BYTES);
}
});
mic.on('error', (err) => {
console.error('Mic error:', err.message);
});
When the audio stream ends, call send_finalize (Python) or sendFinalize (Node) to flush any unprocessed audio in Deepgram's server-side pipeline. Without this, the last few hundred milliseconds of speech can be missed because Deepgram's endpointing has not yet closed the utterance. Finalize emits a final Results message with from_finalize: true. Wait a short grace period (around 2 seconds) for that message to arrive before tearing the connection down.
This step is specific to Nova-3 (v1). Flux uses EndOfTurn events for turn boundaries and does not need an explicit Finalize call.
for chunk in audio_iter():
connection.send_media(chunk)
connection.send_finalize()
time.sleep(2.0) # grace for the from_finalize Results message
if (buffer.length >= MIN_BYTES) {
socket.sendMedia(buffer);
}
socket.sendFinalize({ type: 'Finalize' });
await new Promise((r) => setTimeout(r, 2000)); // grace for the from_finalize Results message
Close the WebSocket and stop the microphone when the user presses Ctrl+C. In Python, connection.send_close_stream() tells Deepgram you are done sending audio but does not close the local websocket. The Python SDK does not currently expose a public close method on V1SocketClient, so to break out of start_listening() you must close the underlying websocket directly via connection._websocket.close(). This is a known SDK quirk; future SDK releases may add a public close method.
In Node, socket.sendCloseStream({ type: 'CloseStream' }) followed by socket.close() is the public API; the 'close' event fires once the server has acknowledged.
The Python pattern below runs the send-side work on a background thread because connection.start_listening() blocks the main thread until the socket closes. The thread is required to keep audio flowing while the main thread receives events.
import threading
import time
print("Listening... (Ctrl+C to stop)")
try:
with mic:
def stream():
try:
for chunk in audio_iter():
connection.send_media(chunk)
connection.send_finalize()
time.sleep(2.0)
finally:
connection.send_close_stream()
# SDK quirk: V1SocketClient has no public close method;
# force-close the underlying websocket so start_listening
# returns. Future SDK releases may add a public close.
connection._websocket.close()
sender = threading.Thread(target=stream, daemon=True)
sender.start()
connection.start_listening() # blocks until the websocket closes
sender.join(timeout=7.0)
except KeyboardInterrupt:
print("\nStopping...")
const closed = new Promise((resolve) => socket.on('close', () => resolve()));
process.on('SIGINT', async () => {
console.log('\nStopping...');
mic.stop();
if (buffer.length >= MIN_BYTES) {
socket.sendMedia(buffer);
}
socket.sendFinalize({ type: 'Finalize' });
await new Promise((r) => setTimeout(r, 2000));
socket.sendCloseStream({ type: 'CloseStream' });
socket.close();
await closed;
process.exit(0);
});
console.log('Listening... (Ctrl+C to stop)');
Same client construction as Nova-3. The product is selected at connect time, not on the client.
import os
from dotenv import load_dotenv
import decibri
from deepgram import DeepgramClient
from deepgram.core.events import EventType
load_dotenv()
client = DeepgramClient(api_key=os.environ["DEEPGRAM_API_KEY"])
require('dotenv').config();
const { Microphone } = require('decibri');
const { DeepgramClient } = require('@deepgram/sdk');
const client = new DeepgramClient({ apiKey: process.env.DEEPGRAM_API_KEY });
Open a WebSocket to Deepgram and select the Flux model. The connect args include eot_threshold (end-of-turn probability threshold, default 0.7) and eot_timeout_ms (maximum wait for end-of-turn, default 5000) which configure the turn-detection behaviour.
The Wrapped-client and Authorization caveats described in the Nova-3 connect step apply identically here: in Node you must call socket.connect() before awaiting socket.waitForOpen(), you should wrap waitForOpen() in a Promise.race with a timeout, and the connect args must include an Authorization header. The Python connect args do not need one.
with client.listen.v2.connect(
model="flux-general-en",
eot_threshold=0.7,
eot_timeout_ms=5000,
encoding="linear16",
sample_rate=16000,
) as connection:
# Register handlers, open the microphone, stream audio, and shut
# down inside this block (see steps below).
...
const KEY = process.env.DEEPGRAM_API_KEY;
const socket = await client.listen.v2.connect({
model: 'flux-general-en',
encoding: 'linear16',
sample_rate: 16000,
eot_threshold: 0.7,
eot_timeout_ms: 5000,
Authorization: `Token ${KEY}`,
});
socket.connect();
await Promise.race([
socket.waitForOpen(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Deepgram WebSocket open timeout after 10s')), 10000)
),
]);
Register handlers before streaming audio. In Python, Flux's v2 SDK delivers each message as a plain dict (not a Pydantic object as v1 does); use .get(...) rather than attribute access. In Node, messages are plain JS objects with direct property access for both v1 and v2.
def on_open(_):
print("Deepgram (Flux) connected.")
def on_message(result):
if result.get("event") != "EndOfTurn":
return
print(f"[turn {result.get('turn_index')}] {result.get('transcript') or ''} "
f"(conf={result.get('end_of_turn_confidence')})")
connection.on(EventType.OPEN, on_open)
connection.on(EventType.MESSAGE, on_message)
socket.on('open', () => {
console.log('Deepgram (Flux) connected.');
});
socket.on('message', (msg) => {
if (msg && msg.event === 'EndOfTurn') {
console.log(`[turn ${msg.turn_index}] ${msg.transcript || ''} (conf=${msg.end_of_turn_confidence})`);
}
});
socket.on('error', (err) => {
console.error('Deepgram error:', err);
});
socket.on('close', () => {
console.log('Connection closed.');
});
Same microphone construction as Nova-3. Create a decibri instance at 16 kHz mono.
mic = decibri.Microphone(sample_rate=16000, channels=1)
const mic = new Microphone({ sampleRate: 16000, channels: 1 });
Same batching as the Nova-3 walkthrough. Decibri produces raw int16 PCM at the rate Deepgram expects, so no format conversion is needed.
BATCH_BYTES = 3200 # 100 ms of int16 mono at 16 kHz
MIN_BYTES = 1600 # 50 ms tail-drop
def audio_iter():
buffer = bytearray()
for chunk in mic:
buffer.extend(chunk)
while len(buffer) >= BATCH_BYTES:
yield bytes(buffer[:BATCH_BYTES])
del buffer[:BATCH_BYTES]
if len(buffer) >= MIN_BYTES:
yield bytes(buffer)
const BATCH_BYTES = 3200; // 100 ms of int16 mono at 16 kHz
const MIN_BYTES = 1600; // 50 ms tail-drop
let buffer = Buffer.alloc(0);
mic.on('data', (chunk) => {
buffer = Buffer.concat([buffer, chunk]);
while (buffer.length >= BATCH_BYTES) {
socket.sendMedia(buffer.subarray(0, BATCH_BYTES));
buffer = buffer.subarray(BATCH_BYTES);
}
});
mic.on('error', (err) => {
console.error('Mic error:', err.message);
});
Same shutdown shape as Nova-3, minus the Finalize call. Wait a short grace period (around 2 seconds) after the last audio chunk for any late EndOfTurn events to arrive, then close the connection. The Python SDK quirk (no public close on V2SocketClient, requires connection._websocket.close() to break out of start_listening()) applies to v2 as well as v1.
import threading
import time
print("Listening... (Ctrl+C to stop)")
try:
with mic:
def stream():
try:
for chunk in audio_iter():
connection.send_media(chunk)
# Grace for late EndOfTurn events to arrive after the
# last audio chunk hits the server.
time.sleep(2.0)
finally:
connection.send_close_stream()
# Same SDK quirk as Nova-3: V2SocketClient has no
# public close method; force-close the underlying
# websocket so start_listening returns.
connection._websocket.close()
sender = threading.Thread(target=stream, daemon=True)
sender.start()
connection.start_listening()
sender.join(timeout=7.0)
except KeyboardInterrupt:
print("\nStopping...")
const closed = new Promise((resolve) => socket.on('close', () => resolve()));
process.on('SIGINT', async () => {
console.log('\nStopping...');
mic.stop();
if (buffer.length >= MIN_BYTES) {
socket.sendMedia(buffer);
}
// Grace for late EndOfTurn events to arrive after the last audio
// chunk hits the server.
await new Promise((r) => setTimeout(r, 2000));
socket.sendCloseStream({ type: 'CloseStream' });
socket.close();
await closed;
process.exit(0);
});
console.log('Listening... (Ctrl+C to stop)');
import os
import threading
import time
from dotenv import load_dotenv
import decibri
from deepgram import DeepgramClient
from deepgram.core.events import EventType
BATCH_BYTES = 3200 # 100 ms of int16 mono at 16 kHz
MIN_BYTES = 1600 # 50 ms tail-drop
load_dotenv()
client = DeepgramClient(api_key=os.environ["DEEPGRAM_API_KEY"])
with client.listen.v1.connect(
model="nova-3",
encoding="linear16",
sample_rate=16000,
) as connection:
def on_open(_):
print("Deepgram connected.")
def on_message(result):
if result.type != "Results" or not result.is_final:
return
alts = result.channel.alternatives if result.channel else []
if not alts:
return
transcript = alts[0].transcript
if transcript:
print(transcript)
connection.on(EventType.OPEN, on_open)
connection.on(EventType.MESSAGE, on_message)
mic = decibri.Microphone(sample_rate=16000, channels=1)
def audio_iter():
buffer = bytearray()
for chunk in mic:
buffer.extend(chunk)
while len(buffer) >= BATCH_BYTES:
yield bytes(buffer[:BATCH_BYTES])
del buffer[:BATCH_BYTES]
if len(buffer) >= MIN_BYTES:
yield bytes(buffer)
print("Listening... (Ctrl+C to stop)")
try:
with mic:
def stream():
try:
for chunk in audio_iter():
connection.send_media(chunk)
connection.send_finalize()
time.sleep(2.0)
finally:
connection.send_close_stream()
# SDK quirk: V1SocketClient has no public close method.
connection._websocket.close()
sender = threading.Thread(target=stream, daemon=True)
sender.start()
connection.start_listening()
sender.join(timeout=7.0)
except KeyboardInterrupt:
print("\nStopping...")
'use strict';
require('dotenv').config();
const { Microphone } = require('decibri');
const { DeepgramClient } = require('@deepgram/sdk');
const BATCH_BYTES = 3200; // 100 ms of int16 mono at 16 kHz
const MIN_BYTES = 1600; // 50 ms tail-drop
const run = async () => {
const KEY = process.env.DEEPGRAM_API_KEY;
const client = new DeepgramClient({ apiKey: KEY });
const socket = await client.listen.v1.connect({
model: 'nova-3',
encoding: 'linear16',
sample_rate: 16000,
Authorization: `Token ${KEY}`,
});
socket.on('open', () => {
console.log('Deepgram connected.');
});
socket.on('message', (msg) => {
if (msg && msg.type === 'Results' && msg.is_final) {
const alts = (msg.channel && msg.channel.alternatives) || [];
const transcript = alts[0] && alts[0].transcript;
if (transcript) {
console.log(transcript);
}
}
});
socket.on('error', (err) => {
console.error('Deepgram error:', err);
});
const closed = new Promise((resolve) => socket.on('close', () => resolve()));
socket.connect();
await Promise.race([
socket.waitForOpen(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Deepgram WebSocket open timeout after 10s')), 10000)
),
]);
const mic = new Microphone({ sampleRate: 16000, channels: 1 });
let buffer = Buffer.alloc(0);
mic.on('data', (chunk) => {
buffer = Buffer.concat([buffer, chunk]);
while (buffer.length >= BATCH_BYTES) {
socket.sendMedia(buffer.subarray(0, BATCH_BYTES));
buffer = buffer.subarray(BATCH_BYTES);
}
});
mic.on('error', (err) => {
console.error('Mic error:', err.message);
});
process.on('SIGINT', async () => {
console.log('\nStopping...');
mic.stop();
if (buffer.length >= MIN_BYTES) {
socket.sendMedia(buffer);
}
socket.sendFinalize({ type: 'Finalize' });
await new Promise((r) => setTimeout(r, 2000));
socket.sendCloseStream({ type: 'CloseStream' });
socket.close();
await closed;
process.exit(0);
});
console.log('Listening... (Ctrl+C to stop)');
};
run().catch(console.error);
import os
import threading
import time
from dotenv import load_dotenv
import decibri
from deepgram import DeepgramClient
from deepgram.core.events import EventType
BATCH_BYTES = 3200 # 100 ms of int16 mono at 16 kHz
MIN_BYTES = 1600 # 50 ms tail-drop
load_dotenv()
client = DeepgramClient(api_key=os.environ["DEEPGRAM_API_KEY"])
with client.listen.v2.connect(
model="flux-general-en",
eot_threshold=0.7,
eot_timeout_ms=5000,
encoding="linear16",
sample_rate=16000,
) as connection:
def on_open(_):
print("Deepgram (Flux) connected.")
def on_message(result):
if result.get("event") != "EndOfTurn":
return
print(f"[turn {result.get('turn_index')}] {result.get('transcript') or ''} "
f"(conf={result.get('end_of_turn_confidence')})")
connection.on(EventType.OPEN, on_open)
connection.on(EventType.MESSAGE, on_message)
mic = decibri.Microphone(sample_rate=16000, channels=1)
def audio_iter():
buffer = bytearray()
for chunk in mic:
buffer.extend(chunk)
while len(buffer) >= BATCH_BYTES:
yield bytes(buffer[:BATCH_BYTES])
del buffer[:BATCH_BYTES]
if len(buffer) >= MIN_BYTES:
yield bytes(buffer)
print("Listening... (Ctrl+C to stop)")
try:
with mic:
def stream():
try:
for chunk in audio_iter():
connection.send_media(chunk)
time.sleep(2.0)
finally:
connection.send_close_stream()
connection._websocket.close()
sender = threading.Thread(target=stream, daemon=True)
sender.start()
connection.start_listening()
sender.join(timeout=7.0)
except KeyboardInterrupt:
print("\nStopping...")
'use strict';
require('dotenv').config();
const { Microphone } = require('decibri');
const { DeepgramClient } = require('@deepgram/sdk');
const BATCH_BYTES = 3200; // 100 ms of int16 mono at 16 kHz
const MIN_BYTES = 1600; // 50 ms tail-drop
const run = async () => {
const KEY = process.env.DEEPGRAM_API_KEY;
const client = new DeepgramClient({ apiKey: KEY });
const socket = await client.listen.v2.connect({
model: 'flux-general-en',
encoding: 'linear16',
sample_rate: 16000,
eot_threshold: 0.7,
eot_timeout_ms: 5000,
Authorization: `Token ${KEY}`,
});
socket.on('open', () => {
console.log('Deepgram (Flux) connected.');
});
socket.on('message', (msg) => {
if (msg && msg.event === 'EndOfTurn') {
console.log(`[turn ${msg.turn_index}] ${msg.transcript || ''} (conf=${msg.end_of_turn_confidence})`);
}
});
socket.on('error', (err) => {
console.error('Deepgram error:', err);
});
const closed = new Promise((resolve) => socket.on('close', () => resolve()));
socket.connect();
await Promise.race([
socket.waitForOpen(),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Deepgram WebSocket open timeout after 10s')), 10000)
),
]);
const mic = new Microphone({ sampleRate: 16000, channels: 1 });
let buffer = Buffer.alloc(0);
mic.on('data', (chunk) => {
buffer = Buffer.concat([buffer, chunk]);
while (buffer.length >= BATCH_BYTES) {
socket.sendMedia(buffer.subarray(0, BATCH_BYTES));
buffer = buffer.subarray(BATCH_BYTES);
}
});
mic.on('error', (err) => {
console.error('Mic error:', err.message);
});
process.on('SIGINT', async () => {
console.log('\nStopping...');
mic.stop();
if (buffer.length >= MIN_BYTES) {
socket.sendMedia(buffer);
}
await new Promise((r) => setTimeout(r, 2000));
socket.sendCloseStream({ type: 'CloseStream' });
socket.close();
await closed;
process.exit(0);
});
console.log('Listening... (Ctrl+C to stop)');
};
run().catch(console.error);
The connect args control how Deepgram processes your audio. Each product accepts a different set of options. Names are identical in Python and Node.
| Option | Value | Description |
|---|---|---|
model |
'nova-3' |
Required. The Nova-3 model selector. |
encoding |
'linear16' |
Audio encoding. Must match decibri's int16 output. |
sample_rate |
16000 |
Audio sample rate. Must match the rate decibri captures at. |
language |
'en' |
Language code. Use 'multi' for automatic language detection (subject to model support). |
punctuate |
false |
Add punctuation to transcripts. |
smart_format |
false |
Format numerals, currency, and dates. |
diarize |
false |
Identify different speakers in the audio. |
interim_results |
false |
Emit progressive results that refine as more audio is processed. |
endpointing |
10 |
Milliseconds of silence before a final result is triggered. |
Authorization |
`Token ${KEY}` |
Required on Node. Pass in the connect args because the SDK's apiKey option flows to REST endpoints only. |
See the Deepgram Nova streaming reference for the complete option list.
| Option | Value | Description |
|---|---|---|
model |
'flux-general-en' |
Required. The Flux model selector. |
encoding |
'linear16' |
Audio encoding. Must match decibri's int16 output. |
sample_rate |
16000 |
Audio sample rate. Must match the rate decibri captures at. |
eot_threshold |
0.7 |
End-of-turn confidence threshold. Higher values require more confident silence detection before closing a turn. |
eot_timeout_ms |
5000 |
Maximum wait in milliseconds before forcing an end of turn even without high confidence. |
Authorization |
`Token ${KEY}` |
Required on Node. Pass in the connect args because the SDK's apiKey option flows to REST endpoints only. |
See the Deepgram Flux streaming reference for additional options including event filtering and turn-update behaviour.