Dead-simple to use GraphQL client over websocket. Using the apollo-transport-ws protocol.
pip install py-graphql-client
from graphql_client import GraphQLClient
query = """
subscription {
notifications {
id
title
content
}
}
"""
def callback(_id, data):
print("got new data..")
print(f"msg id: {_id}. data: {data}")
with GraphQLClient('ws://localhost:8080/graphql') as client:
sub_id = client.subscribe(query, callback=callback)
# do other stuff
# ...
# later stop the subscription
client.stop_subscribe(sub_id)
from graphql_client import GraphQLClient
query = """
subscription ($limit: Int!) {
notifications (order_by: {created: "desc"}, limit: $limit) {
id
title
content
}
}
"""
def callback(_id, data):
print("got new data..")
print(f"msg id: {_id}. data: {data}")
with GraphQLClient('ws://localhost:8080/graphql') as client:
sub_id = client.subscribe(query, variables={'limit': 10}, callback=callback)
# ...
from graphql_client import GraphQLClient
query = """
subscription ($limit: Int!) {
notifications (order_by: {created: "desc"}, limit: $limit) {
id
title
content
}
}
"""
def callback(_id, data):
print("got new data..")
print(f"msg id: {_id}. data: {data}")
with GraphQLClient('ws://localhost:8080/graphql') as client:
sub_id = client.subscribe(query,
variables={'limit': 10},
headers={'Authorization': 'Bearer xxxx'},
callback=callback)
...
client.stop_subscribe(sub_id)
from graphql_client import GraphQLClient
query = """
query ($limit: Int!) {
notifications (order_by: {created: "desc"}, limit: $limit) {
id
title
content
}
}
"""
with GraphQLClient('ws://localhost:8080/graphql') as client:
res = client.query(query, variables={'limit': 10}, headers={'Authorization': 'Bearer xxxx'})
print(res)
from graphql_client import GraphQLClient
query = """
query ($limit: Int!) {
notifications (order_by: {created: "desc"}, limit: $limit) {
id
title
content
}
}
"""
client = GraphQLClient('ws://localhost:8080/graphql')
res = client.query(query, variables={'limit': 10}, headers={'Authorization': 'Bearer xxxx'})
print(res)
client.close()
- support http as well
- should use asyncio websocket library?