|
Add this skill
npx mdskills install sickn33/azure-messaging-webpubsubservice-pyComprehensive Azure WebSocket SDK reference with clear examples but lacks agent-specific instructions
Real-time messaging with WebSocket connections at scale.
# Service SDK (server-side)
pip install azure-messaging-webpubsubservice
# Client SDK (for Python WebSocket clients)
pip install azure-messaging-webpubsubclient
AZURE_WEBPUBSUB_CONNECTION_STRING=Endpoint=https://.webpubsub.azure.com;AccessKey=...
AZURE_WEBPUBSUB_HUB=my-hub
from azure.messaging.webpubsubservice import WebPubSubServiceClient
# Connection string
client = WebPubSubServiceClient.from_connection_string(
connection_string=os.environ["AZURE_WEBPUBSUB_CONNECTION_STRING"],
hub="my-hub"
)
# Entra ID
from azure.identity import DefaultAzureCredential
client = WebPubSubServiceClient(
endpoint="https://.webpubsub.azure.com",
hub="my-hub",
credential=DefaultAzureCredential()
)
# Token for anonymous user
token = client.get_client_access_token()
print(f"URL: {token['url']}")
# Token with user ID
token = client.get_client_access_token(
user_id="user123",
roles=["webpubsub.sendToGroup", "webpubsub.joinLeaveGroup"]
)
# Token with groups
token = client.get_client_access_token(
user_id="user123",
groups=["group1", "group2"]
)
# Send text
client.send_to_all(message="Hello everyone!", content_type="text/plain")
# Send JSON
client.send_to_all(
message={"type": "notification", "data": "Hello"},
content_type="application/json"
)
client.send_to_user(
user_id="user123",
message="Hello user!",
content_type="text/plain"
)
client.send_to_group(
group="my-group",
message="Hello group!",
content_type="text/plain"
)
client.send_to_connection(
connection_id="abc123",
message="Hello connection!",
content_type="text/plain"
)
# Add user to group
client.add_user_to_group(group="my-group", user_id="user123")
# Remove user from group
client.remove_user_from_group(group="my-group", user_id="user123")
# Add connection to group
client.add_connection_to_group(group="my-group", connection_id="abc123")
# Remove connection from group
client.remove_connection_from_group(group="my-group", connection_id="abc123")
# Check if connection exists
exists = client.connection_exists(connection_id="abc123")
# Check if user has connections
exists = client.user_exists(user_id="user123")
# Check if group has connections
exists = client.group_exists(group="my-group")
# Close connection
client.close_connection(connection_id="abc123", reason="Session ended")
# Close all connections for user
client.close_all_connections(user_id="user123")
from azure.messaging.webpubsubservice import WebPubSubServiceClient
# Grant permission
client.grant_permission(
permission="joinLeaveGroup",
connection_id="abc123",
target_name="my-group"
)
# Revoke permission
client.revoke_permission(
permission="joinLeaveGroup",
connection_id="abc123",
target_name="my-group"
)
# Check permission
has_permission = client.check_permission(
permission="joinLeaveGroup",
connection_id="abc123",
target_name="my-group"
)
from azure.messaging.webpubsubclient import WebPubSubClient
client = WebPubSubClient(credential=token["url"])
# Event handlers
@client.on("connected")
def on_connected(e):
print(f"Connected: {e.connection_id}")
@client.on("server-message")
def on_message(e):
print(f"Message: {e.data}")
@client.on("group-message")
def on_group_message(e):
print(f"Group {e.group}: {e.data}")
# Connect and send
client.open()
client.send_to_group("my-group", "Hello from Python!")
from azure.messaging.webpubsubservice.aio import WebPubSubServiceClient
from azure.identity.aio import DefaultAzureCredential
async def broadcast():
credential = DefaultAzureCredential()
client = WebPubSubServiceClient(
endpoint="https://.webpubsub.azure.com",
hub="my-hub",
credential=credential
)
await client.send_to_all("Hello async!", content_type="text/plain")
await client.close()
await credential.close()
| Operation | Description |
|---|---|
get_client_access_token | Generate WebSocket connection URL |
send_to_all | Broadcast to all connections |
send_to_user | Send to specific user |
send_to_group | Send to group members |
send_to_connection | Send to specific connection |
add_user_to_group | Add user to group |
remove_user_from_group | Remove user from group |
close_connection | Disconnect client |
connection_exists | Check connection status |
Install via CLI
npx mdskills install sickn33/azure-messaging-webpubsubservice-pyAzure Messaging Webpubsubservice Py is a free, open-source AI agent skill. |
Install Azure Messaging Webpubsubservice Py with a single command:
npx mdskills install sickn33/azure-messaging-webpubsubservice-pyThis downloads the skill files into your project and your AI agent picks them up automatically.
Azure Messaging Webpubsubservice Py works with Claude Code, Claude Desktop, Cursor, Vscode Copilot, Windsurf, Continue Dev, Codex, Gemini Cli, Amp, Roo Code, Goose, Opencode, Trae, Qodo, Command Code. Skills use the open SKILL.md format which is compatible with any AI coding agent that reads markdown instructions.