Mbed Host Tests
module_power_cycle_mbed.py
Go to the documentation of this file.
1"""
2mbed SDK
3Copyright (c) 2011-2015 ARM Limited
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17Author: Przemyslaw Wirkus <Przemyslaw.Wirkus@arm.com>
18"""
19
20import os
21import json
22import time
23import requests
24from .host_test_plugins import HostTestPluginBase
25
26
27class HostTestPluginPowerCycleResetMethod(HostTestPluginBase):
28
29 # Plugin interface
30 name = 'HostTestPluginPowerCycleResetMethod'
31 type = 'ResetMethod'
32 stable = True
33 capabilities = ['power_cycle']
34 required_parameters = ['target_id', 'device_info']
35
36 def __init__(self):
37 """ ctor
38 """
39 HostTestPluginBase.__init__(self)
40
41 def setup(self, *args, **kwargs):
42 """! Configure plugin, this function should be called before plugin execute() method is used.
43 """
44 return True
45
46 def execute(self, capability, *args, **kwargs):
47 """! Executes capability by name
48
49 @param capability Capability name
50 @param args Additional arguments
51 @param kwargs Additional arguments
52 @details Each capability e.g. may directly just call some command line program or execute building pythonic function
53 @return Capability call return value
54 """
55 if 'target_id' not in kwargs or not kwargs['target_id']:
56 self.print_plugin_error("Error: This plugin requires mbed target_id")
57 return False
58
59 if 'device_info' not in kwargs or type(kwargs['device_info']) is not dict:
60 self.print_plugin_error("Error: This plugin requires dict parameter 'device_info' passed by the caller.")
61 return False
62
63 result = False
64 if self.check_parameters(capability, *args, **kwargs) is True:
65 if capability in HostTestPluginPowerCycleResetMethod.capabilities:
66 target_id = kwargs['target_id']
67 device_info = kwargs['device_info']
68 ret = self.__get_mbed_tas_rm_addr()
69 if ret:
70 ip, port = ret
71 result = self.__hw_reset(ip, port, target_id, device_info)
72 return result
73
74 def __get_mbed_tas_rm_addr(self):
75 """
76 Get IP and Port of mbed tas rm service.
77 :return:
78 """
79 try:
80 ip = os.environ['MBED_TAS_RM_IP']
81 port = os.environ['MBED_TAS_RM_PORT']
82 return ip, port
83 except KeyError as e:
84 self.print_plugin_error("HOST: Failed to read environment variable (" + str(e) + "). Can't perform hardware reset.")
85
86 return None
87
88 def __hw_reset(self, ip, port, target_id, device_info):
89 """
90 Reset target device using TAS RM API
91
92 :param ip:
93 :param port:
94 :param target_id:
95 :param device_info:
96 :return:
97 """
98
99 switch_off_req = {
100 "name": "switchResource",
101 "sub_requests": [
102 {
103 "resource_type": "mbed_platform",
104 "resource_id": target_id,
105 "switch_command": "OFF"
106 }
107 ]
108 }
109
110
111 switch_on_req = {
112 "name": "switchResource",
113 "sub_requests": [
114 {
115 "resource_type": "mbed_platform",
116 "resource_id": target_id,
117 "switch_command": "ON"
118 }
119 ]
120 }
121
122 result = False
123
124 # reset target
125 switch_off_req = self.__run_request(ip, port, switch_off_req)
126 if switch_off_req is None:
127 self.print_plugin_error("HOST: Failed to communicate with TAS RM!")
128 return result
129
130 if "error" in switch_off_req['sub_requests'][0]:
131 self.print_plugin_error("HOST: Failed to reset target. error = %s" % switch_off_req['sub_requests'][0]['error'])
132 return result
133
134 def poll_state(required_state):
135 switch_state_req = {
136 "name": "switchResource",
137 "sub_requests": [
138 {
139 "resource_type": "mbed_platform",
140 "resource_id": target_id,
141 "switch_command": "STATE"
142 }
143 ]
144 }
145 resp = self.__run_request(ip, port, switch_state_req)
146 start = time.time()
147 while resp and (resp['sub_requests'][0]['state'] != required_state or (required_state == 'ON' and
148 resp['sub_requests'][0]["mount_point"] == "Not Connected")) and (time.time() - start) < 300:
149 time.sleep(2)
150 resp = self.__run_request(ip, port, resp)
151 return resp
152
153 poll_state("OFF")
154
155 self.__run_request(ip, port, switch_on_req)
156 resp = poll_state("ON")
157 if resp and resp['sub_requests'][0]['state'] == 'ON' and resp['sub_requests'][0]["mount_point"] != "Not Connected":
158 for k, v in resp['sub_requests'][0].viewitems():
159 device_info[k] = v
160 result = True
161 else:
162 self.print_plugin_error("HOST: Failed to reset device %s" % target_id)
163
164 return result
165
166 @staticmethod
167 def __run_request(ip, port, request):
168 """
169
170 :param request:
171 :return:
172 """
173 headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
174 get_resp = requests.get("http://%s:%s/" % (ip, port), data=json.dumps(request), headers=headers)
175 resp = get_resp.json()
176 if get_resp.status_code == 200:
177 return resp
178 else:
179 return None
180
181
183 """! Returns plugin available in this module
184 """
def print_plugin_error(self, text)
Interface helper methods - overload only if you need to have custom behaviour.
def check_parameters(self, capability, *args, **kwargs)
This function should be ran each time we call execute() to check if none of the required parameters i...
def setup(self, *args, **kwargs)
Configure plugin, this function should be called before plugin execute() method is used.
def load_plugin()
Returns plugin available in this module.