API 文档
API 文档默认已经集成在代码里面, 部署完成后可以通过下面的方式进行访问
API 访问
Version | Access method | example |
---|---|---|
< 2.0.0 | http://<url>/docs | http://192.168.244.144/docs |
>=2.0.0 | http://<url>/api/docs/ | http://192.168.244.144/api/docs/ |
>=2.6.0 | http://<url>/api/docs/ | http://192.168.244.144/api/docs/ |
版本小于 v2.6 需要打开 debug 模式
vi config.yml
...
# 如果版本更低的话,配置文件是 config.py
# Debug = true
DEBUG: true
API 认证
JumpServer API 支持的认证有以下几种方式
Session 登录后可以直接使用 session_id 作为认证方式
Token 获取一次性 Token,该 Token 有有效期, 过期作废
Private Token 永久 Token
Access Key 对 Http Header 进行签名
Session
用户通过页面后登录,cookie 中会存在 sessionid, 请求时同样把 sessionid 放到 cookie 中
Token
curl -X POST http://localhost/api/v1/authentication/auth/ \
-H 'Content-Type: application/json' \
-d '{"username": "admin", "password": "admin"}'
# pip install requests
import requests, json
jms_url = 'https://demo.jumpserver.org'
username = 'admin'
password = 'admin'
def get_token():
url = jms_url + '/api/v1/authentication/auth/'
query_args = {
"username": username,
"password": password
}
response = requests.post(url, data=query_args)
return json.loads(response.text)['token']
def get_user_info():
url = jms_url + '/api/v1/users/users/'
token = get_token()
headers = { "Authorization": 'Bearer ' + token }
response = requests.get(url, headers=headers)
print(json.loads(response.text))
get_user_info()
Private Token
docker exec -it jms_core /bin/bash
cd /opt/jumpserver/apps
python manage.py shell
from users.models import User
u = User.objects.get(username='admin')
u.create_private_token()
已经存在 private_token, 可以直接获取即可
u.private_token
以 PrivateToken: 937b38011acf499eb474e2fecb424ab3 为例:
curl -H 'Authorization: Token 937b38011acf499eb474e2fecb424ab3' \
-H "Content-Type:application/json" http://demo.jumpserver.org/api/v1/users/users/
# pip install requests
import requests, json
jms_url = 'https://demo.jumpserver.org'
jms_token = '937b38011acf499eb474e2fecb424ab3'
def get_user_info():
url = jms_url + '/api/v1/users/users/'
headers = { "Authorization": 'Token ' + jms_token }
response = requests.get(url, headers=headers)
print(json.loads(response.text))
get_user_info()
Access Key
在 Web 页面 API Key 列表创建或获取 AccessKeyID AccessKeySecret
# pip install requests drf-httpsig
import requests, datetime, json
from httpsig.requests_auth import HTTPSignatureAuth
jms_url = 'https://demo.jumpserver.org'
AccessKeyID = 'AccessKeyID'
AccessKeySecret = 'AccessKeySecret'
GMT_FORMAT = '%a, %d %b %Y %H:%M:%S GMT'
def get_auth():
signature_headers = ['(request-target)', 'accept', 'date']
auth = HTTPSignatureAuth(key_id=AccessKeyID, secret=AccessKeySecret, algorithm='hmac-sha256', headers=signature_headers)
return auth
def get_user_info():
url = jms_url + '/api/v1/users/users/'
auth = get_auth()
headers = {
'Accept': 'application/json',
'Date': datetime.datetime.utcnow().strftime(GMT_FORMAT)
}
response = requests.get(url, auth=auth, headers=headers)
print(json.loads(response.text))
get_user_info()
示例
Token
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import sys, requests, time
class HTTP:
server = None
token = None
@classmethod
def get_token(cls, username, password):
data = {'username': username, 'password': password}
url = "/api/v1/authentication/auth/"
res = requests.post(cls.server + url, data)
res_data = res.json()
if res.status_code in [200, 201] and res_data:
token = res_data.get('token')
cls.token = token
else:
print("获取 token 错误, 请检查输入项是否正确")
sys.exit()
@classmethod
def get(cls, url, params=None, **kwargs):
url = cls.server + url
headers = {
'Authorization': "Bearer {}".format(cls.token)
}
kwargs['headers'] = headers
res = requests.get(url, params, **kwargs)
return res
@classmethod
def post(cls, url, data=None, json=None, **kwargs):
url = cls.server + url
headers = {
'Authorization': "Bearer {}".format(cls.token)
}
kwargs['headers'] = headers
res = requests.post(url, data, json, **kwargs)
return res
class User(object):
def __init__(self):
self.id = None
self.name = user_name
self.username = user_username
self.email = user_email
def exist(self):
url = '/api/v1/users/users/'
params = {'username': self.username}
res = HTTP.get(url, params=params)
res_data = res.json()
if res.status_code in [200, 201] and res_data:
self.id = res_data[0].get('id')
else:
self.create()
def create(self):
print("创建用户 {}".format(self.username))
url = '/api/v1/users/users/'
data = {
'name': self.name,
'username': self.username,
'email': self.email,
'is_active': True
}
res = HTTP.post(url, json=data)
self.id = res.json().get('id')
def perform(self):
self.exist()
class Node(object):
def __init__(self):
self.id = None
self.name = asset_node_name
def exist(self):
url = '/api/v1/assets/nodes/'
params = {'value': self.name}
res = HTTP.get(url, params=params)
res_data = res.json()
if res.status_code in [200, 201] and res_data:
self.id = res_data[0].get('id')
else:
self.create()
def create(self):
print("创建资产节点 {}".format(self.name))
url = '/api/v1/assets/nodes/'
data = {
'value': self.name
}
res = HTTP.post(url, json=data)
self.id = res.json().get('id')
def perform(self):
self.exist()
class AdminUser(object):
def __init__(self):
self.id = None
self.name = assets_admin_name
self.username = assets_admin_username
self.password = assets_admin_password
def exist(self):
url = '/api/v1/assets/admin-user/'
params = {'username': self.name}
res = HTTP.get(url, params=params)
res_data = res.json()
if res.status_code in [200, 201] and res_data:
self.id = res_data[0].get('id')
else:
self.create()
def create(self):
print("创建管理用户 {}".format(self.name))
url = '/api/v1/assets/admin-users/'
data = {
'name': self.name,
'username': self.username,
'password': self.password
}
res = HTTP.post(url, json=data)
self.id = res.json().get('id')
def perform(self):
self.exist()
class Asset(object):
def __init__(self):
self.id = None
self.name = asset_name
self.ip = asset_ip
self.platform = asset_platform
self.protocols = asset_protocols
self.admin_user = AdminUser()
self.node = Node()
def exist(self):
url = '/api/v1/assets/assets/'
params = {
'hostname': self.name
}
res = HTTP.get(url, params)
res_data = res.json()
if res.status_code in [200, 201] and res_data:
self.id = res_data[0].get('id')
else:
self.create()
def create(self):
print("创建资产 {}".format(self.ip))
self.admin_user.perform()
self.node.perform()
url = '/api/v1/assets/assets/'
data = {
'hostname': self.ip,
'ip': self.ip,
'platform': self.platform,
'protocols': self.protocols,
'admin_user': self.admin_user.id,
'nodes': [self.node.id],
'is_active': True
}
res = HTTP.post(url, json=data)
self.id = res.json().get('id')
def perform(self):
self.exist()
class SystemUser(object):
def __init__(self):
self.id = None
self.name = assets_system_name
self.username = assets_system_username
def exist(self):
url = '/api/v1/assets/system-users/'
params = {'name': self.name}
res = HTTP.get(url, params)
res_data = res.json()
if res.status_code in [200, 201] and res_data:
self.id = res_data[0].get('id')
else:
self.create()
def create(self):
print("创建系统用户 {}".format(self.name))
url = '/api/v1/assets/system-users/'
data = {
'name': self.name,
'username': self.username,
'login_mode': 'auto',
'protocol': 'ssh',
'auto_push': True,
'sudo': 'All',
'shell': '/bin/bash',
'auto_generate_key': True,
'is_active': True
}
res = HTTP.post(url, json=data)
self.id = res.json().get('id')
def perform(self):
self.exist()
class AssetPermission(object):
def __init__(self):
self.name = perm_name
self.user = User()
self.asset = Asset()
self.system_user = SystemUser()
def create(self):
print("创建资产授权名称 {}".format(self.name))
url = '/api/v1/perms/asset-permissions/'
data = {
'name': self.name,
'users': [self.user.id],
'assets': [self.asset.id],
'system_users': [self.system_user.id],
'actions': ['all'],
'is_active': True,
'date_start': perm_date_start,
'date_expired': perm_date_expired
}
res = HTTP.post(url, json=data)
res_data = res.json()
if res.status_code in [200, 201] and res_data:
print("创建资产授权规则成功: ", res_data)
else:
print("创建授权规则失败: ", res_data)
def perform(self):
self.user.perform()
self.asset.perform()
self.system_user.perform()
self.create()
class APICreateAssetPermission(object):
def __init__(self):
self.jms_url = jms_url
self.username = jms_username
self.password = jms_password
self.token = None
self.server = None
def init_http(self):
HTTP.server = self.jms_url
HTTP.get_token(self.username, self.password)
def perform(self):
self.init_http()
self.perm = AssetPermission()
self.perm.perform()
if __name__ == '__main__':
# jumpserver url 地址
jms_url = 'http://192.168.100.244'
# 管理员账户
jms_username = 'admin'
jms_password = 'admin'
# 资产节点
asset_node_name = 'test'
# 资产信息
asset_name = '192.168.100.1'
asset_ip = '192.168.100.1'
asset_platform = 'Linux'
asset_protocols = ['ssh/22']
# 资产管理用户
assets_admin_name = 'test_root'
assets_admin_username = 'root'
assets_admin_password = 'test123456'
# 资产系统用户
assets_system_name = 'test'
assets_system_username = 'test'
# 用户用户名
user_name = '测试用户'
user_username = 'test'
user_email = 'test@jumpserver.org'
# 资产授权
perm_name = 'AutoPerm' +'_'+ (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
perm_date_start = '2021-05-01 14:25:47 +0800'
perm_date_expired = '2021-06-01 14:25:47 +0800'
api = APICreateAssetPermission()
api.perform()
Access Key
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import sys, requests, time, datetime
from httpsig.requests_auth import HTTPSignatureAuth
class HTTP:
server = None
auth = None
@classmethod
def get_auth(cls, accesskeyid, accesskeysecret):
signature_headers = ['(request-target)', 'accept', 'date']
auth = HTTPSignatureAuth(key_id=accesskeyid, secret=accesskeysecret, algorithm='hmac-sha256', headers=signature_headers)
cls.auth = auth
@classmethod
def get(cls, url, params=None, **kwargs):
url = cls.server + url
GMT_FORMAT = '%a, %d %b %Y %H:%M:%S GMT'
headers = {
'Accept': 'application/json',
'Date': datetime.datetime.utcnow().strftime(GMT_FORMAT)
}
kwargs['auth'] = cls.auth
kwargs['headers'] = headers
res = requests.get(url, params, **kwargs)
return res
@classmethod
def post(cls, url, data=None, json=None, **kwargs):
url = cls.server + url
GMT_FORMAT = '%a, %d %b %Y %H:%M:%S GMT'
headers = {
'Accept': 'application/json',
'Date': datetime.datetime.utcnow().strftime(GMT_FORMAT)
}
kwargs['auth'] = cls.auth
kwargs['headers'] = headers
res = requests.post(url, data, json, **kwargs)
return res
class User(object):
def __init__(self):
self.id = None
self.name = user_name
self.username = user_username
self.email = user_email
def exist(self):
url = '/api/v1/users/users/'
params = {'username': self.username}
res = HTTP.get(url, params=params)
res_data = res.json()
if res.status_code in [200, 201] and res_data:
self.id = res_data[0].get('id')
else:
self.create()
def create(self):
print("创建用户 {}".format(self.username))
url = '/api/v1/users/users/'
data = {
'name': self.name,
'username': self.username,
'email': self.email,
'is_active': True
}
res = HTTP.post(url, json=data)
self.id = res.json().get('id')
def perform(self):
self.exist()
class Node(object):
def __init__(self):
self.id = None
self.name = asset_node_name
def exist(self):
url = '/api/v1/assets/nodes/'
params = {'value': self.name}
res = HTTP.get(url, params=params)
res_data = res.json()
if res.status_code in [200, 201] and res_data:
self.id = res_data[0].get('id')
else:
self.create()
def create(self):
print("创建资产节点 {}".format(self.name))
url = '/api/v1/assets/nodes/'
data = {
'value': self.name
}
res = HTTP.post(url, json=data)
self.id = res.json().get('id')
def perform(self):
self.exist()
class AdminUser(object):
def __init__(self):
self.id = None
self.name = assets_admin_name
self.username = assets_admin_username
self.password = assets_admin_password
def exist(self):
url = '/api/v1/assets/admin-user/'
params = {'username': self.name}
res = HTTP.get(url, params=params)
res_data = res.json()
if res.status_code in [200, 201] and res_data:
self.id = res_data[0].get('id')
else:
self.create()
def create(self):
print("创建管理用户 {}".format(self.name))
url = '/api/v1/assets/admin-users/'
data = {
'name': self.name,
'username': self.username,
'password': self.password
}
res = HTTP.post(url, json=data)
self.id = res.json().get('id')
def perform(self):
self.exist()
class Asset(object):
def __init__(self):
self.id = None
self.name = asset_name
self.ip = asset_ip
self.platform = asset_platform
self.protocols = asset_protocols
self.admin_user = AdminUser()
self.node = Node()
def exist(self):
url = '/api/v1/assets/assets/'
params = {
'hostname': self.name
}
res = HTTP.get(url, params)
res_data = res.json()
if res.status_code in [200, 201] and res_data:
self.id = res_data[0].get('id')
else:
self.create()
def create(self):
print("创建资产 {}".format(self.ip))
self.admin_user.perform()
self.node.perform()
url = '/api/v1/assets/assets/'
data = {
'hostname': self.ip,
'ip': self.ip,
'platform': self.platform,
'protocols': self.protocols,
'admin_user': self.admin_user.id,
'nodes': [self.node.id],
'is_active': True
}
res = HTTP.post(url, json=data)
self.id = res.json().get('id')
def perform(self):
self.exist()
class SystemUser(object):
def __init__(self):
self.id = None
self.name = assets_system_name
self.username = assets_system_username
def exist(self):
url = '/api/v1/assets/system-users/'
params = {'name': self.name}
res = HTTP.get(url, params)
res_data = res.json()
if res.status_code in [200, 201] and res_data:
self.id = res_data[0].get('id')
else:
self.create()
def create(self):
print("创建系统用户 {}".format(self.name))
url = '/api/v1/assets/system-users/'
data = {
'name': self.name,
'username': self.username,
'login_mode': 'auto',
'protocol': 'ssh',
'auto_push': True,
'sudo': 'All',
'shell': '/bin/bash',
'auto_generate_key': True,
'is_active': True
}
res = HTTP.post(url, json=data)
self.id = res.json().get('id')
def perform(self):
self.exist()
class AssetPermission(object):
def __init__(self):
self.name = perm_name
self.user = User()
self.asset = Asset()
self.system_user = SystemUser()
def create(self):
print("创建资产授权名称 {}".format(self.name))
url = '/api/v1/perms/asset-permissions/'
data = {
'name': self.name,
'users': [self.user.id],
'assets': [self.asset.id],
'system_users': [self.system_user.id],
'actions': ['all'],
'is_active': True,
'date_start': perm_date_start,
'date_expired': perm_date_expired
}
res = HTTP.post(url, json=data)
res_data = res.json()
if res.status_code in [200, 201] and res_data:
print("创建资产授权规则成功: ", res_data)
else:
print("创建授权规则失败: ", res_data)
def perform(self):
self.user.perform()
self.asset.perform()
self.system_user.perform()
self.create()
class APICreateAssetPermission(object):
def __init__(self):
self.jms_url = jms_url
self.accesskeyid = jms_accesskeyid
self.accesskeysecret = jms_accesskeysecret
self.auth = None
self.server = None
def init_http(self):
HTTP.server = self.jms_url
HTTP.get_auth(self.accesskeyid, self.accesskeysecret)
def perform(self):
self.init_http()
self.perm = AssetPermission()
self.perm.perform()
if __name__ == '__main__':
# jumpserver url 地址
jms_url = 'http://192.168.100.244'
# 管理员 AK SK
jms_accesskeyid = ''
jms_accesskeysecret = ''
# 资产节点
asset_node_name = 'test'
# 资产信息
asset_name = '192.168.100.1'
asset_ip = '192.168.100.1'
asset_platform = 'Linux'
asset_protocols = ['ssh/22']
# 资产管理用户
assets_admin_name = 'test_root'
assets_admin_username = 'root'
assets_admin_password = 'test123456'
# 资产系统用户
assets_system_name = 'test'
assets_system_username = 'test'
# 用户用户名
user_name = '测试用户'
user_username = 'test'
user_email = 'test@jumpserver.org'
# 资产授权
perm_name = 'AutoPerm' +'_'+ (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
perm_date_start = '2021-05-01 14:25:47 +0800'
perm_date_expired = '2021-06-01 14:25:47 +0800'
api = APICreateAssetPermission()
api.perform()
当前内容版权归 jumpserver 或其关联方所有,如需对内容或内容相关联开源项目进行关注与资助,请访问 jumpserver .