Skip to content

Atomic Operator

AtomicOperator

Bases: Base

Main class used to run Atomic Red Team tests.

atomic-operator is used to run Atomic Red Team tests both locally and remotely. These tests (atomics) are predefined tests to mock or emulate a specific technique.

config_file definition

atomic-operator's run method can be supplied with a path to a configuration file (config_file) which defines specific tests and/or values for input parameters to facilitate automation of said tests. An example of this config_file can be seen below:

inventory:
  linux1:
    executor: ssh
    authentication:
      username: root
      password: Somepassword!
      #ssk_key_path:
      port: 22
      timeout: 5
    hosts:
      # - 192.168.1.1
      - 10.32.100.199
      # etc.
atomic_tests:
  - guid: f7e6ec05-c19e-4a80-a7e7-241027992fdb
    input_arguments:
      output_file:
        value: custom_output.txt
      input_file:
        value: custom_input.txt
  - guid: 3ff64f0b-3af2-3866-339d-38d9791407c3
    input_arguments:
      second_arg:
        value: SWAPPPED argument
  - guid: 32f90516-4bc9-43bd-b18d-2cbe0b7ca9b2
    inventories:
      - linux1

Raises:

Type Description
ValueError

If a provided technique is unknown we raise an error.

Source code in atomic_operator/atomic_operator.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
class AtomicOperator(Base):

    """Main class used to run Atomic Red Team tests.

    atomic-operator is used to run Atomic Red Team tests both locally and remotely.
    These tests (atomics) are predefined tests to mock or emulate a specific technique.

    config_file definition:
            atomic-operator's run method can be supplied with a path to a configuration file (config_file) which defines 
            specific tests and/or values for input parameters to facilitate automation of said tests.
            An example of this config_file can be seen below:

                inventory:
                  linux1:
                    executor: ssh
                    authentication:
                      username: root
                      password: Somepassword!
                      #ssk_key_path:
                      port: 22
                      timeout: 5
                    hosts:
                      # - 192.168.1.1
                      - 10.32.100.199
                      # etc.
                atomic_tests:
                  - guid: f7e6ec05-c19e-4a80-a7e7-241027992fdb
                    input_arguments:
                      output_file:
                        value: custom_output.txt
                      input_file:
                        value: custom_input.txt
                  - guid: 3ff64f0b-3af2-3866-339d-38d9791407c3
                    input_arguments:
                      second_arg:
                        value: SWAPPPED argument
                  - guid: 32f90516-4bc9-43bd-b18d-2cbe0b7ca9b2
                    inventories:
                      - linux1

    Raises:
        ValueError: If a provided technique is unknown we raise an error.
    """

    __test_responses = {}

    def __find_path(self, value):
        """Attempts to find a path containing the atomic-red-team repository

        Args:
            value (str): A starting path to iterate through

        Returns:
            str: An absolute path containing the path to the atomic-red-team repo
        """
        if value == os.getcwd():
            for x in os.listdir(value):
                if os.path.isdir(x) and 'redcanaryco-atomic-red-team' in x:
                    if os.path.exists(self.get_abs_path(os.path.join(x, 'atomics'))):
                        return self.get_abs_path(os.path.join(x, 'atomics'))
        else:
            if os.path.exists(self.get_abs_path(value)):
                return self.get_abs_path(value)

    def __check_arguments(self, kwargs, method):
        if kwargs:
            for arguments in inspect.getfullargspec(method):
                if isinstance(arguments, list):
                    for arg in arguments:
                        for key,val in kwargs.items():
                            if key in arg:
                                return IncorrectParameters(f"You passed in an argument of '{key}' which is not recognized. Did you mean '{arg}'?")
            return IncorrectParameters(f"You passed in an argument of '{key}' which is not recognized.")

    def __run_technique(self, technique, **kwargs):
        """This method is used to run defined Atomic tests within 
           a MITRE ATT&CK Technique.

        Args:
            technique (Atomic): An Atomic object which contains a list of AtomicTest
                                objects.
        """
        self.__logger.debug(f"Checking technique {technique.attack_technique} ({technique.display_name}) for applicable tests.")
        for test in technique.atomic_tests:
            self._set_input_arguments(test, **kwargs)
            if test.auto_generated_guid not in self.__test_responses:
                self.__test_responses[test.auto_generated_guid] = {}
            if technique.hosts:
                for host in technique.hosts:
                    self.__logger.info(f"Running {test.name} test ({test.auto_generated_guid}) for technique {technique.attack_technique}")
                    self.__logger.debug(f"Description: {test.description}")
                    if test.executor.name in ['sh', 'bash']:
                        self.__test_responses[test.auto_generated_guid].update(
                            RemoteRunner(test, technique.path).start(host=host, executor='ssh')
                        )
                    elif test.executor.name in ['command_prompt']:
                        self.__test_responses[test.auto_generated_guid].update(
                            RemoteRunner(test, technique.path).start(host=host, executor='cmd')
                        )
                    elif test.executor.name in ['powershell']:
                        self.__test_responses[test.auto_generated_guid].update(
                            RemoteRunner(test, technique.path).start(host=host, executor='powershell')
                        )
                    else:
                        self.__logger.warning(f"Unable to execute test since the executor is {test.executor.name}. Skipping.....")
            else:
                if self._check_platform(test, show_output=True):
                    self.__logger.info(f"Running {test.name} test ({test.auto_generated_guid}) for technique {technique.attack_technique}")
                    self.__logger.debug(f"Description: {test.description}")
                    if self._check_if_aws(test):
                        self.__test_responses[test.auto_generated_guid].update(
                            AWSRunner(test, technique.path).start()
                        )
                    else:
                        self.__test_responses[test.auto_generated_guid].update(
                            LocalRunner(test, technique.path).start()
                        )
            if self.__test_responses.get(test.auto_generated_guid):
                self.__test_responses[test.auto_generated_guid].update({
                    'technique_id': technique.attack_technique,
                    'technique_name': technique.display_name
                })

    def help(self, method=None):
        from fire.trace import FireTrace
        from fire.helptext import HelpText
        obj = AtomicOperator if not method else getattr(self, method)
        return HelpText(self.run,trace=FireTrace(obj))

    def get_atomics(self, desintation=os.getcwd(), **kwargs):
        """Downloads the RedCanary atomic-red-team repository to your local system.

        Args:
            desintation (str, optional): A folder path to download the repositorty data to. Defaults to os.getcwd().
            kwargs (dict, optional): This kwargs will be passed along to Python requests library during download. Defaults to None.

        Returns:
            str: The path the data can be found at.
        """
        if not os.path.exists(desintation):
            os.makedirs(desintation)
        desintation = kwargs.pop('destination') if kwargs.get('destination') else desintation
        folder_name = self.download_atomic_red_team_repo(
            save_path=desintation, 
            **kwargs
        )
        return os.path.join(desintation, folder_name)

    def run(self, techniques: list=['all'], test_guids: list=[], select_tests=False,
                  atomics_path=os.getcwd(), check_prereqs=False, get_prereqs=False, 
                  cleanup=False, copy_source_files=True,command_timeout=20, debug=False, 
                  prompt_for_input_args=False, return_atomics=False, config_file=None, 
                  config_file_only=False, hosts=[], username=None, password=None, 
                  ssh_key_path=None, private_key_string=None, verify_ssl=False, 
                  ssh_port=22, ssh_timeout=5, *args, **kwargs) -> None:
        """The main method in which we run Atomic Red Team tests.

        Args:
            techniques (list, optional): One or more defined techniques by attack_technique ID. Defaults to 'all'.
            test_guids (list, optional): One or more Atomic test GUIDs. Defaults to None.
            select_tests (bool, optional): Select one or more tests from provided techniques. Defaults to False.
            atomics_path (str, optional): The path of Atomic tests. Defaults to os.getcwd().
            check_prereqs (bool, optional): Whether or not to check for prereq dependencies (prereq_comand). Defaults to False.
            get_prereqs (bool, optional): Whether or not you want to retrieve prerequisites. Defaults to False.
            cleanup (bool, optional): Whether or not you want to run cleanup command(s). Defaults to False.
            copy_source_files (bool, optional): Whether or not you want to copy any related source (src, bin, etc.) files to a remote host. Defaults to True.
            command_timeout (int, optional): Timeout duration for each command. Defaults to 20.
            debug (bool, optional): Whether or not you want to output details about tests being ran. Defaults to False.
            prompt_for_input_args (bool, optional): Whether you want to prompt for input arguments for each test. Defaults to False.
            return_atomics (bool, optional): Whether or not you want to return atomics instead of running them. Defaults to False.
            config_file (str, optional): A path to a conifg_file which is used to automate atomic-operator in environments. Default to None.
            config_file_only (bool, optional): Whether or not you want to run tests based on the provided config_file only. Defaults to False.
            hosts (list, optional): A list of one or more remote hosts to run a test on. Defaults to [].
            username (str, optional): Username for authentication of remote connections. Defaults to None.
            password (str, optional): Password for authentication of remote connections. Defaults to None.
            ssh_key_path (str, optional): Path to a SSH Key for authentication of remote connections. Defaults to None.
            private_key_string (str, optional): A private SSH Key string used for authentication of remote connections. Defaults to None.
            verify_ssl (bool, optional): Whether or not to verify ssl when connecting over RDP (windows). Defaults to False.
            ssh_port (int, optional): SSH port for authentication of remote connections. Defaults to 22.
            ssh_timeout (int, optional): SSH timeout for authentication of remote connections. Defaults to 5.
            kwargs (dict, optional): If provided, keys matching inputs for a test will be replaced. Default is None.

        Raises:
            ValueError: If a provided technique is unknown we raise an error.
        """
        response = self.__check_arguments(kwargs, self.run)
        if response:
            return response
        if kwargs.get('help'):
            return self.help(method='run')
        if debug:
            import logging
            logging.getLogger().setLevel(logging.DEBUG)
        count = 0
        if check_prereqs:
            count += 1
        if get_prereqs:
            count += 1
        if cleanup:
            count += 1
        if count > 1:
            return IncorrectParameters(f"You have passed in incompatible arguments. Please only provide one of 'check_prereqs','get_prereqs','cleanup'.")
        atomics_path = self.__find_path(atomics_path)
        if not atomics_path:
            return AtomicsFolderNotFound('Unable to find a folder containing Atomics. Please provide a path or run get_atomics.')
        Base.CONFIG = Config(
            atomics_path          = atomics_path,
            check_prereqs         = check_prereqs,
            get_prereqs           = get_prereqs,
            cleanup               = cleanup,
            command_timeout       = command_timeout,
            debug                 = debug,
            prompt_for_input_args = prompt_for_input_args,
            kwargs                = kwargs,
            copy_source_files     = copy_source_files
        )
        # taking inputs from both config_file and passed in values via command
        # line to build a run_list of objects
        self.__config_parser = ConfigParser(
                config_file=config_file,
                techniques=None if config_file_only else self.parse_input_lists(techniques),
                test_guids=None if config_file_only else self.parse_input_lists(test_guids),
                host_list=None if config_file_only else self.parse_input_lists(hosts),
                username=username,
                password=password,
                ssh_key_path=ssh_key_path,
                private_key_string=private_key_string,
                verify_ssl=verify_ssl,
                ssh_port=ssh_port,
                ssh_timeout=ssh_timeout,
                select_tests=select_tests
            )
        self.__run_list = self.__config_parser.run_list

        __return_atomics = []
        for item in self.__run_list:
            if return_atomics:
                __return_atomics.append(item)
            elif kwargs.get('kwargs'):
                self.__run_technique(item, **kwargs.get('kwargs'))
            else:
                self.__run_technique(item)
        if return_atomics and __return_atomics:
            return __return_atomics
        return self.__test_responses

get_atomics(desintation=os.getcwd(), **kwargs)

Downloads the RedCanary atomic-red-team repository to your local system.

Parameters:

Name Type Description Default
desintation str

A folder path to download the repositorty data to. Defaults to os.getcwd().

os.getcwd()
kwargs dict

This kwargs will be passed along to Python requests library during download. Defaults to None.

required

Returns:

Name Type Description
str

The path the data can be found at.

Source code in atomic_operator/atomic_operator.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def get_atomics(self, desintation=os.getcwd(), **kwargs):
    """Downloads the RedCanary atomic-red-team repository to your local system.

    Args:
        desintation (str, optional): A folder path to download the repositorty data to. Defaults to os.getcwd().
        kwargs (dict, optional): This kwargs will be passed along to Python requests library during download. Defaults to None.

    Returns:
        str: The path the data can be found at.
    """
    if not os.path.exists(desintation):
        os.makedirs(desintation)
    desintation = kwargs.pop('destination') if kwargs.get('destination') else desintation
    folder_name = self.download_atomic_red_team_repo(
        save_path=desintation, 
        **kwargs
    )
    return os.path.join(desintation, folder_name)

run(techniques=['all'], test_guids=[], select_tests=False, atomics_path=os.getcwd(), check_prereqs=False, get_prereqs=False, cleanup=False, copy_source_files=True, command_timeout=20, debug=False, prompt_for_input_args=False, return_atomics=False, config_file=None, config_file_only=False, hosts=[], username=None, password=None, ssh_key_path=None, private_key_string=None, verify_ssl=False, ssh_port=22, ssh_timeout=5, *args, **kwargs)

The main method in which we run Atomic Red Team tests.

Parameters:

Name Type Description Default
techniques list

One or more defined techniques by attack_technique ID. Defaults to 'all'.

['all']
test_guids list

One or more Atomic test GUIDs. Defaults to None.

[]
select_tests bool

Select one or more tests from provided techniques. Defaults to False.

False
atomics_path str

The path of Atomic tests. Defaults to os.getcwd().

os.getcwd()
check_prereqs bool

Whether or not to check for prereq dependencies (prereq_comand). Defaults to False.

False
get_prereqs bool

Whether or not you want to retrieve prerequisites. Defaults to False.

False
cleanup bool

Whether or not you want to run cleanup command(s). Defaults to False.

False
copy_source_files bool

Whether or not you want to copy any related source (src, bin, etc.) files to a remote host. Defaults to True.

True
command_timeout int

Timeout duration for each command. Defaults to 20.

20
debug bool

Whether or not you want to output details about tests being ran. Defaults to False.

False
prompt_for_input_args bool

Whether you want to prompt for input arguments for each test. Defaults to False.

False
return_atomics bool

Whether or not you want to return atomics instead of running them. Defaults to False.

False
config_file str

A path to a conifg_file which is used to automate atomic-operator in environments. Default to None.

None
config_file_only bool

Whether or not you want to run tests based on the provided config_file only. Defaults to False.

False
hosts list

A list of one or more remote hosts to run a test on. Defaults to [].

[]
username str

Username for authentication of remote connections. Defaults to None.

None
password str

Password for authentication of remote connections. Defaults to None.

None
ssh_key_path str

Path to a SSH Key for authentication of remote connections. Defaults to None.

None
private_key_string str

A private SSH Key string used for authentication of remote connections. Defaults to None.

None
verify_ssl bool

Whether or not to verify ssl when connecting over RDP (windows). Defaults to False.

False
ssh_port int

SSH port for authentication of remote connections. Defaults to 22.

22
ssh_timeout int

SSH timeout for authentication of remote connections. Defaults to 5.

5
kwargs dict

If provided, keys matching inputs for a test will be replaced. Default is None.

required

Raises:

Type Description
ValueError

If a provided technique is unknown we raise an error.

Source code in atomic_operator/atomic_operator.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def run(self, techniques: list=['all'], test_guids: list=[], select_tests=False,
              atomics_path=os.getcwd(), check_prereqs=False, get_prereqs=False, 
              cleanup=False, copy_source_files=True,command_timeout=20, debug=False, 
              prompt_for_input_args=False, return_atomics=False, config_file=None, 
              config_file_only=False, hosts=[], username=None, password=None, 
              ssh_key_path=None, private_key_string=None, verify_ssl=False, 
              ssh_port=22, ssh_timeout=5, *args, **kwargs) -> None:
    """The main method in which we run Atomic Red Team tests.

    Args:
        techniques (list, optional): One or more defined techniques by attack_technique ID. Defaults to 'all'.
        test_guids (list, optional): One or more Atomic test GUIDs. Defaults to None.
        select_tests (bool, optional): Select one or more tests from provided techniques. Defaults to False.
        atomics_path (str, optional): The path of Atomic tests. Defaults to os.getcwd().
        check_prereqs (bool, optional): Whether or not to check for prereq dependencies (prereq_comand). Defaults to False.
        get_prereqs (bool, optional): Whether or not you want to retrieve prerequisites. Defaults to False.
        cleanup (bool, optional): Whether or not you want to run cleanup command(s). Defaults to False.
        copy_source_files (bool, optional): Whether or not you want to copy any related source (src, bin, etc.) files to a remote host. Defaults to True.
        command_timeout (int, optional): Timeout duration for each command. Defaults to 20.
        debug (bool, optional): Whether or not you want to output details about tests being ran. Defaults to False.
        prompt_for_input_args (bool, optional): Whether you want to prompt for input arguments for each test. Defaults to False.
        return_atomics (bool, optional): Whether or not you want to return atomics instead of running them. Defaults to False.
        config_file (str, optional): A path to a conifg_file which is used to automate atomic-operator in environments. Default to None.
        config_file_only (bool, optional): Whether or not you want to run tests based on the provided config_file only. Defaults to False.
        hosts (list, optional): A list of one or more remote hosts to run a test on. Defaults to [].
        username (str, optional): Username for authentication of remote connections. Defaults to None.
        password (str, optional): Password for authentication of remote connections. Defaults to None.
        ssh_key_path (str, optional): Path to a SSH Key for authentication of remote connections. Defaults to None.
        private_key_string (str, optional): A private SSH Key string used for authentication of remote connections. Defaults to None.
        verify_ssl (bool, optional): Whether or not to verify ssl when connecting over RDP (windows). Defaults to False.
        ssh_port (int, optional): SSH port for authentication of remote connections. Defaults to 22.
        ssh_timeout (int, optional): SSH timeout for authentication of remote connections. Defaults to 5.
        kwargs (dict, optional): If provided, keys matching inputs for a test will be replaced. Default is None.

    Raises:
        ValueError: If a provided technique is unknown we raise an error.
    """
    response = self.__check_arguments(kwargs, self.run)
    if response:
        return response
    if kwargs.get('help'):
        return self.help(method='run')
    if debug:
        import logging
        logging.getLogger().setLevel(logging.DEBUG)
    count = 0
    if check_prereqs:
        count += 1
    if get_prereqs:
        count += 1
    if cleanup:
        count += 1
    if count > 1:
        return IncorrectParameters(f"You have passed in incompatible arguments. Please only provide one of 'check_prereqs','get_prereqs','cleanup'.")
    atomics_path = self.__find_path(atomics_path)
    if not atomics_path:
        return AtomicsFolderNotFound('Unable to find a folder containing Atomics. Please provide a path or run get_atomics.')
    Base.CONFIG = Config(
        atomics_path          = atomics_path,
        check_prereqs         = check_prereqs,
        get_prereqs           = get_prereqs,
        cleanup               = cleanup,
        command_timeout       = command_timeout,
        debug                 = debug,
        prompt_for_input_args = prompt_for_input_args,
        kwargs                = kwargs,
        copy_source_files     = copy_source_files
    )
    # taking inputs from both config_file and passed in values via command
    # line to build a run_list of objects
    self.__config_parser = ConfigParser(
            config_file=config_file,
            techniques=None if config_file_only else self.parse_input_lists(techniques),
            test_guids=None if config_file_only else self.parse_input_lists(test_guids),
            host_list=None if config_file_only else self.parse_input_lists(hosts),
            username=username,
            password=password,
            ssh_key_path=ssh_key_path,
            private_key_string=private_key_string,
            verify_ssl=verify_ssl,
            ssh_port=ssh_port,
            ssh_timeout=ssh_timeout,
            select_tests=select_tests
        )
    self.__run_list = self.__config_parser.run_list

    __return_atomics = []
    for item in self.__run_list:
        if return_atomics:
            __return_atomics.append(item)
        elif kwargs.get('kwargs'):
            self.__run_technique(item, **kwargs.get('kwargs'))
        else:
            self.__run_technique(item)
    if return_atomics and __return_atomics:
        return __return_atomics
    return self.__test_responses

Base

Source code in atomic_operator/base.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
class Base(metaclass=LoggingBase):

    CONFIG = None
    ATOMIC_RED_TEAM_REPO = 'https://github.com/redcanaryco/atomic-red-team/zipball/master/'
    command_map = {
        'command_prompt': {
            'windows': 'C:\\Windows\\System32\\cmd.exe',
            'linux': '/bin/sh',
            'macos': '/bin/sh',
            'default': '/bin/sh'
        },
        'powershell': {
            'windows': 'C:\\Windows\\System32\\WindowsPowerShell\\v1.0\\powershell.exe'
        },
        'sh': {
            'linux': '/bin/sh',
            'macos': '/bin/sh'
        },
        'bash': {
            'linux': '/bin/bash',
            'macos': '/bin/bash'
        }
    }
    VARIABLE_REPLACEMENTS = {
        'command_prompt': {
            '%temp%': "$env:TEMP"
        }
    }
    _replacement_strings = [
        '#{{{0}}}',
        '${{{0}}}'
    ]

    def download_atomic_red_team_repo(self, save_path, **kwargs) -> str:
        """Downloads the Atomic Red Team repository from github

        Args:
            save_path (str): The path to save the downloaded and extracted ZIP contents

        Returns:
            str: A string of the location the data was saved to.
        """
        response = requests.get(Base.ATOMIC_RED_TEAM_REPO, stream=True, **kwargs)
        z = zipfile.ZipFile(BytesIO(response.content))
        with zipfile.ZipFile(BytesIO(response.content)) as zf:
            for member in zf.infolist():
                file_path = os.path.realpath(os.path.join(save_path, member.filename))
                if file_path.startswith(os.path.realpath(save_path)):
                    zf.extract(member, save_path)
        return z.namelist()[0]

    def get_local_system_platform(self) -> str:
        """Identifies the local systems operating system platform

        Returns:
            str: The current/local systems operating system platform
        """
        os_name = platform.system().lower()
        if os_name == "darwin":
            return "macos"
        return os_name

    def get_abs_path(self, value) -> str:
        """Formats and returns the absolute path for a path value

        Args:
            value (str): A path string in many different accepted formats

        Returns:
            str: The absolute path of the provided string
        """
        return os.path.abspath(os.path.expanduser(os.path.expandvars(value)))

    def prompt_user_for_input(self, title, input_object):
        """Prompts user for input values based on the provided values.
        """
        print(f"""
Inputs for {title}:
    Input Name: {input_object.name}
    Default:     {input_object.default}
    Description: {input_object.description}
""")
        print(f"Please provide a value for {input_object.name} (If blank, default is used):",)
        value = sys.stdin.readline()
        if bool(value):
            return value
        return input_object.default

    def parse_input_lists(self, value):
        value_list = None
        if not isinstance(value, list):
            value_list = set([t.strip() for t in value.split(',')])
        else:
            value_list = set(value)
        return list(value_list)

    def _path_replacement(self, string, path):
        try:
            string = string.replace('$PathToAtomicsFolder', path)
        except:
            pass
        try:
            string = string.replace('PathToAtomicsFolder', path)
        except:
            pass
        return string

    def _replace_command_string(self, command: str, path:str, input_arguments: list=[], executor=None):
        if command:
            command = self._path_replacement(command, path)
            if input_arguments:
                for input in input_arguments:
                    for string in self._replacement_strings:
                        try:
                            command = command.replace(str(string.format(input.name)), str(input.value))
                        except:
                            # catching errors since some inputs are actually integers but defined as strings
                            pass
                    if executor and self.VARIABLE_REPLACEMENTS.get(executor):
                        for key,val in self.VARIABLE_REPLACEMENTS[executor].items():
                            try:
                                command = command.replace(key, val)
                            except:
                                pass
        return self._path_replacement(command, path)

    def _check_if_aws(self, test):
        if 'iaas:aws' in test.supported_platforms and self.get_local_system_platform() in ['macos', 'linux']:
            return True
        return False

    def _check_platform(self, test, show_output=False) -> bool:
        if self._check_if_aws(test):
            return True
        if test.supported_platforms and self.get_local_system_platform() not in test.supported_platforms:
            self.__logger.info(f"You provided a test ({test.auto_generated_guid}) '{test.name}' which is not supported on this platform. Skipping...")
            return False
        return True

    def _set_input_arguments(self, test, **kwargs):
        if test.input_arguments:
            if kwargs:
                for input in test.input_arguments:
                    for key,val in kwargs.items():
                        if input.name == key:
                            input.value = val
            if Base.CONFIG.prompt_for_input_args:
                for input in test.input_arguments:
                    input.value = self.prompt_user_for_input(test.name, input)
            for key,val in self.VARIABLE_REPLACEMENTS.items():
                if test.executor.name == key:
                    for k,v in val.items():
                        for input in test.input_arguments:
                            if k in input.default:
                                input.value = input.default.replace(k,v)
            for input in test.input_arguments:
                if input.value == None:
                    input.value = input.default

    def select_atomic_tests(self, technique):
        options = None
        test_list = []
        for test in technique.atomic_tests:
            test_list.append(test)
        if test_list:
            options = pick(
                test_list, 
                title=f"Select Test(s) for Technique {technique.attack_technique} ({technique.display_name})", 
                multiselect=True, 
                options_map_func=self.format_pick_options
            )
        return [i[0] for i in options] if options else []

    def format_pick_options(self, option):
        return f"{option.name} ({option.auto_generated_guid})"

download_atomic_red_team_repo(save_path, **kwargs)

Downloads the Atomic Red Team repository from github

Parameters:

Name Type Description Default
save_path str

The path to save the downloaded and extracted ZIP contents

required

Returns:

Name Type Description
str str

A string of the location the data was saved to.

Source code in atomic_operator/base.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def download_atomic_red_team_repo(self, save_path, **kwargs) -> str:
    """Downloads the Atomic Red Team repository from github

    Args:
        save_path (str): The path to save the downloaded and extracted ZIP contents

    Returns:
        str: A string of the location the data was saved to.
    """
    response = requests.get(Base.ATOMIC_RED_TEAM_REPO, stream=True, **kwargs)
    z = zipfile.ZipFile(BytesIO(response.content))
    with zipfile.ZipFile(BytesIO(response.content)) as zf:
        for member in zf.infolist():
            file_path = os.path.realpath(os.path.join(save_path, member.filename))
            if file_path.startswith(os.path.realpath(save_path)):
                zf.extract(member, save_path)
    return z.namelist()[0]

get_abs_path(value)

Formats and returns the absolute path for a path value

Parameters:

Name Type Description Default
value str

A path string in many different accepted formats

required

Returns:

Name Type Description
str str

The absolute path of the provided string

Source code in atomic_operator/base.py
73
74
75
76
77
78
79
80
81
82
def get_abs_path(self, value) -> str:
    """Formats and returns the absolute path for a path value

    Args:
        value (str): A path string in many different accepted formats

    Returns:
        str: The absolute path of the provided string
    """
    return os.path.abspath(os.path.expanduser(os.path.expandvars(value)))

get_local_system_platform()

Identifies the local systems operating system platform

Returns:

Name Type Description
str str

The current/local systems operating system platform

Source code in atomic_operator/base.py
62
63
64
65
66
67
68
69
70
71
def get_local_system_platform(self) -> str:
    """Identifies the local systems operating system platform

    Returns:
        str: The current/local systems operating system platform
    """
    os_name = platform.system().lower()
    if os_name == "darwin":
        return "macos"
    return os_name

prompt_user_for_input(title, input_object)

Prompts user for input values based on the provided values.

Source code in atomic_operator/base.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
    def prompt_user_for_input(self, title, input_object):
        """Prompts user for input values based on the provided values.
        """
        print(f"""
Inputs for {title}:
    Input Name: {input_object.name}
    Default:     {input_object.default}
    Description: {input_object.description}
""")
        print(f"Please provide a value for {input_object.name} (If blank, default is used):",)
        value = sys.stdin.readline()
        if bool(value):
            return value
        return input_object.default

ConfigParser

Bases: Base

Source code in atomic_operator/configparser.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
class ConfigParser(Base):

    def __init__(self, config_file=None, techniques=None, test_guids=None, 
                       host_list=None, username=None, password=None,
                       ssh_key_path=None, private_key_string=None, verify_ssl=False,
                       ssh_port=22, ssh_timeout=5, select_tests=False
                ):
        """Parses a provided config file as well as parameters to build a run list

        This list combines Atomics and potentially filters 
        tests defined within that Atomic object based on passed
        in parameters and config_file.

        Additionally, a list of Host objects are added to their
        defined techniques or test_guids based on config and/or
        passed in parameters.

        Example: Example structure returned from provided values
        [
            Atomic(
                attack_technique='T1016', 
                display_name='System Network Configuration Discovery', 
                path='/Users/josh.rickard/_Swimlane2/atomic-operator/redcanaryco-atomic-red-team-22dd2fb/atomics/T1016', 
                atomic_tests=[
                    AtomicTest(
                        name='System Network Configuration Discovery', 
                        description='Identify network configuration information.\n\nUpon successful execution, ...', 
                        supported_platforms=['macos', 'linux'], 
                        auto_generated_guid='c141bbdb-7fca-4254-9fd6-f47e79447e17', 
                        executor=AtomicExecutor(
                            name='sh', 
                            command='if [ -x "$(command -v arp)" ]; then arp -a; else echo "arp is missing from ....', 
                            cleanup_command=None, 
                            elevation_required=False, steps=None
                        ), 
                        input_arguments=None, 
                        dependency_executor_name=None, 
                        dependencies=[]
                    )
                ], 
                hosts=[
                    Host(
                        hostname='192.168.1.1', 
                        username='username', 
                        password='some_passowrd!', 
                        verify_ssl=False, 
                        ssh_key_path=None, 
                        private_key_string=None, 
                        port=22, 
                        timeout=5
                    )
                ],
                supporting_files=[
                    'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/top-128.txt', 
                    'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/qakbot.bat'
                ]
            )
        ]
        """
        self.__config_file = self.__load_config(config_file)
        self.techniques = techniques
        self.test_guids = test_guids
        self.select_tests = select_tests
        self.__host_list = []
        if host_list:
            for host in self.parse_input_lists(host_list):
                self.__host_list.append(self.__create_remote_host_object(
                    hostname=host,
                    username=username,
                    password=password,
                    ssh_key_path=ssh_key_path,
                    private_key_string=private_key_string,
                    verify_ssl=verify_ssl,
                    ssh_port=ssh_port,
                    ssh_timeout=ssh_timeout
                ))

    def __load_config(self, config_file):
        if config_file and self.get_abs_path(config_file):
            config_file = self.get_abs_path(config_file)
            if not os.path.exists(config_file):
                raise FileNotFoundError('Please provide a config_file path that exists')
            from .atomic.loader import Loader
            config = Loader().load_technique(config_file)
            if not config.get('atomic_tests') and not isinstance(config, list):
                raise MalformedFile('Please provide one or more atomic_tests within your config_file')
            return config
        return {}

    def __parse_hosts(self, inventory):
        host_list = []
        for host in inventory.get('hosts'):
            inputs = inventory['authentication']
            host_list.append(
                self.__create_remote_host_object(
                    hostname=host,
                    username=inputs['username'] if inputs.get('username') else None,
                    password=inputs['password'] if inputs.get('password') else None,
                    ssh_key_path=inputs['ssh_key_path'] if inputs.get('ssh_key_path') else None,
                    private_key_string=inputs['private_key_string'] if inputs.get('private_key_string') else None,
                    verify_ssl=inputs['verify_ssl'] if inputs.get('verify_ssl') else False,
                    ssh_port=inputs['port'] if inputs.get('port') else 22,
                    ssh_timeout=inputs['timeout'] if inputs.get('timeout') else 5
                )
            )
        return host_list

    def __create_remote_host_object(self, 
        hostname=None,
        username=None,
        password=None,
        ssh_key_path=None,
        private_key_string=None,
        verify_ssl=False,
        ssh_port=22,
        ssh_timeout=5):
            return Host(
                hostname=hostname,
                username=username,
                password=password,
                ssh_key_path=ssh_key_path,
                private_key_string=private_key_string,
                verify_ssl=verify_ssl,
                port=ssh_port,
                timeout=ssh_timeout
            )

    def __parse_test_guids(self, _config_file):
        test_dict = {}
        return_list = []
        if _config_file:
            for item in _config_file['atomic_tests']:
                if item.get('guid'):
                    if item['guid'] not in test_dict:
                        test_dict[item['guid']] = []
                    if item.get('inventories') and _config_file.get('inventory'):
                        # process inventories to run commands remotely
                        for inventory in item['inventories']:
                            if _config_file['inventory'].get(inventory):
                                test_dict[item['guid']] = self.__parse_hosts(_config_file['inventory'][inventory])
        if test_dict:
            for key,val in test_dict.items():
                for item in self.__build_run_list(
                    test_guids=[key],
                    host_list=val
                    ):
                    return_list.append(item)
        return return_list

    def __build_run_list(self, techniques=None, test_guids=None, host_list=None, select_tests=False):
        __run_list = []
        self.__loaded_techniques = Loader().load_techniques()
        if test_guids:
            for key,val in self.__loaded_techniques.items():
                test_list = []
                for test in val.atomic_tests:
                    if test.auto_generated_guid in test_guids:
                        test_list.append(test)
                if test_list:
                    temp = self.__loaded_techniques[key]
                    temp.atomic_tests = test_list
                    temp.hosts = host_list
                    __run_list.append(temp)
        if techniques:
            if 'all' not in techniques:
                for technique in techniques:
                    if self.__loaded_techniques.get(technique):
                        temp = self.__loaded_techniques[technique]
                        if select_tests:
                            temp.atomic_tests = self.select_atomic_tests(
                                self.__loaded_techniques[technique]
                            )
                        temp.hosts = host_list
                        __run_list.append(temp)
            elif 'all' in techniques and not test_guids:
                for key,val in self.__loaded_techniques.items():
                    temp = self.__loaded_techniques[key]
                    if select_tests:
                            temp.atomic_tests = self.select_atomic_tests(
                                self.__loaded_techniques[key]
                            )
                    temp.hosts = host_list
                    __run_list.append(temp)
            else:
                pass
        return __run_list

    @property
    def run_list(self):
        """Returns a list of Atomic objects that will be ran.

        This list combines Atomics and potentially filters 
        tests defined within that Atomic object based on passed
        in parameters and config_file.

        Additionally, a list of Host objects are added to their
        defined techniques or test_guids based on config and/or
        passed in parameters.

        [
            Atomic(
                attack_technique='T1016', 
                display_name='System Network Configuration Discovery', 
                path='/Users/josh.rickard/_Swimlane2/atomic-operator/redcanaryco-atomic-red-team-22dd2fb/atomics/T1016', 
                atomic_tests=[
                    AtomicTest(
                        name='System Network Configuration Discovery', 
                        description='Identify network configuration information.\n\nUpon successful execution, ...', 
                        supported_platforms=['macos', 'linux'], 
                        auto_generated_guid='c141bbdb-7fca-4254-9fd6-f47e79447e17', 
                        executor=AtomicExecutor(
                            name='sh', 
                            command='if [ -x "$(command -v arp)" ]; then arp -a; else echo "arp is missing from ....', 
                            cleanup_command=None, 
                            elevation_required=False, steps=None
                        ), 
                        input_arguments=None, 
                        dependency_executor_name=None, 
                        dependencies=[]
                    )
                ], 
                hosts=[
                    Host(
                        hostname='192.168.1.1', 
                        username='username', 
                        password='some_passowrd!', 
                        verify_ssl=False, 
                        ssh_key_path=None, 
                        private_key_string=None, 
                        port=22, 
                        timeout=5
                    )
                ],
                supporting_files=[
                    'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/top-128.txt', 
                    'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/qakbot.bat'
                ]
            )
        ]

        Returns:
            [list]: A list of modified Atomic objects that will be used to run 
                    either remotely or locally.
        """
        __run_list = []
        if self.__config_file:
            __run_list = self.__parse_test_guids(self.__config_file)

        for item in self.__build_run_list(
            techniques=self.parse_input_lists(self.techniques) if self.techniques else [],
            test_guids=self.parse_input_lists(self.test_guids) if self.test_guids else [],
            host_list=self.__host_list,
            select_tests=self.select_tests
            ):
            __run_list.append(item)
        return __run_list

    @property
    def config(self):
        """Returns raw converted config_file passed into class

        Returns:
            [dict]: Returns the converted config_file as dictionary.
        """
        if self.__config_file:
            return self.__config_file
        else:
            return None

    def is_defined(self, guid: str):
        """Checks to see if a GUID is defined within a config file

        Args:
            guid (str): The GUID defined within a parsed config file

        Returns:
            [bool]: Returns True if GUID is defined within parsed config_file
        """
        if self.__config_file:
            for item in self.__config_file['atomic_tests']:
                if item['guid'] == guid:
                    return True
        return False

    def get_inputs(self, guid: str): 
        """Retrieves any defined inputs for a given atomic test GUID

        Args:
            guid (str): An Atomic test GUID

        Returns:
            dict: A dictionary of defined input arguments or empty
        """
        if self.__config_file:
            for item in self.__config_file['atomic_tests']:
                if item['guid'] == guid:
                    return item.get('input_arguments', {})
        return {}

__init__(config_file=None, techniques=None, test_guids=None, host_list=None, username=None, password=None, ssh_key_path=None, private_key_string=None, verify_ssl=False, ssh_port=22, ssh_timeout=5, select_tests=False)

Parses a provided config file as well as parameters to build a run list

    This list combines Atomics and potentially filters 
    tests defined within that Atomic object based on passed
    in parameters and config_file.

    Additionally, a list of Host objects are added to their
    defined techniques or test_guids based on config and/or
    passed in parameters.

    Example: Example structure returned from provided values
    [
        Atomic(
            attack_technique='T1016', 
            display_name='System Network Configuration Discovery', 
            path='/Users/josh.rickard/_Swimlane2/atomic-operator/redcanaryco-atomic-red-team-22dd2fb/atomics/T1016', 
            atomic_tests=[
                AtomicTest(
                    name='System Network Configuration Discovery', 
                    description='Identify network configuration information.

Upon successful execution, ...', supported_platforms=['macos', 'linux'], auto_generated_guid='c141bbdb-7fca-4254-9fd6-f47e79447e17', executor=AtomicExecutor( name='sh', command='if [ -x "$(command -v arp)" ]; then arp -a; else echo "arp is missing from ....', cleanup_command=None, elevation_required=False, steps=None ), input_arguments=None, dependency_executor_name=None, dependencies=[] ) ], hosts=[ Host( hostname='192.168.1.1', username='username', password='some_passowrd!', verify_ssl=False, ssh_key_path=None, private_key_string=None, port=22, timeout=5 ) ], supporting_files=[ 'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/top-128.txt', 'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/qakbot.bat' ] ) ]

Source code in atomic_operator/configparser.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def __init__(self, config_file=None, techniques=None, test_guids=None, 
                   host_list=None, username=None, password=None,
                   ssh_key_path=None, private_key_string=None, verify_ssl=False,
                   ssh_port=22, ssh_timeout=5, select_tests=False
            ):
    """Parses a provided config file as well as parameters to build a run list

    This list combines Atomics and potentially filters 
    tests defined within that Atomic object based on passed
    in parameters and config_file.

    Additionally, a list of Host objects are added to their
    defined techniques or test_guids based on config and/or
    passed in parameters.

    Example: Example structure returned from provided values
    [
        Atomic(
            attack_technique='T1016', 
            display_name='System Network Configuration Discovery', 
            path='/Users/josh.rickard/_Swimlane2/atomic-operator/redcanaryco-atomic-red-team-22dd2fb/atomics/T1016', 
            atomic_tests=[
                AtomicTest(
                    name='System Network Configuration Discovery', 
                    description='Identify network configuration information.\n\nUpon successful execution, ...', 
                    supported_platforms=['macos', 'linux'], 
                    auto_generated_guid='c141bbdb-7fca-4254-9fd6-f47e79447e17', 
                    executor=AtomicExecutor(
                        name='sh', 
                        command='if [ -x "$(command -v arp)" ]; then arp -a; else echo "arp is missing from ....', 
                        cleanup_command=None, 
                        elevation_required=False, steps=None
                    ), 
                    input_arguments=None, 
                    dependency_executor_name=None, 
                    dependencies=[]
                )
            ], 
            hosts=[
                Host(
                    hostname='192.168.1.1', 
                    username='username', 
                    password='some_passowrd!', 
                    verify_ssl=False, 
                    ssh_key_path=None, 
                    private_key_string=None, 
                    port=22, 
                    timeout=5
                )
            ],
            supporting_files=[
                'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/top-128.txt', 
                'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/qakbot.bat'
            ]
        )
    ]
    """
    self.__config_file = self.__load_config(config_file)
    self.techniques = techniques
    self.test_guids = test_guids
    self.select_tests = select_tests
    self.__host_list = []
    if host_list:
        for host in self.parse_input_lists(host_list):
            self.__host_list.append(self.__create_remote_host_object(
                hostname=host,
                username=username,
                password=password,
                ssh_key_path=ssh_key_path,
                private_key_string=private_key_string,
                verify_ssl=verify_ssl,
                ssh_port=ssh_port,
                ssh_timeout=ssh_timeout
            ))

config() property

Returns raw converted config_file passed into class

Returns:

Type Description

[dict]: Returns the converted config_file as dictionary.

Source code in atomic_operator/configparser.py
265
266
267
268
269
270
271
272
273
274
275
@property
def config(self):
    """Returns raw converted config_file passed into class

    Returns:
        [dict]: Returns the converted config_file as dictionary.
    """
    if self.__config_file:
        return self.__config_file
    else:
        return None

get_inputs(guid)

Retrieves any defined inputs for a given atomic test GUID

Parameters:

Name Type Description Default
guid str

An Atomic test GUID

required

Returns:

Name Type Description
dict

A dictionary of defined input arguments or empty

Source code in atomic_operator/configparser.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def get_inputs(self, guid: str): 
    """Retrieves any defined inputs for a given atomic test GUID

    Args:
        guid (str): An Atomic test GUID

    Returns:
        dict: A dictionary of defined input arguments or empty
    """
    if self.__config_file:
        for item in self.__config_file['atomic_tests']:
            if item['guid'] == guid:
                return item.get('input_arguments', {})
    return {}

is_defined(guid)

Checks to see if a GUID is defined within a config file

Parameters:

Name Type Description Default
guid str

The GUID defined within a parsed config file

required

Returns:

Type Description

[bool]: Returns True if GUID is defined within parsed config_file

Source code in atomic_operator/configparser.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def is_defined(self, guid: str):
    """Checks to see if a GUID is defined within a config file

    Args:
        guid (str): The GUID defined within a parsed config file

    Returns:
        [bool]: Returns True if GUID is defined within parsed config_file
    """
    if self.__config_file:
        for item in self.__config_file['atomic_tests']:
            if item['guid'] == guid:
                return True
    return False

run_list() property

Returns a list of Atomic objects that will be ran.

    This list combines Atomics and potentially filters 
    tests defined within that Atomic object based on passed
    in parameters and config_file.

    Additionally, a list of Host objects are added to their
    defined techniques or test_guids based on config and/or
    passed in parameters.

    [
        Atomic(
            attack_technique='T1016', 
            display_name='System Network Configuration Discovery', 
            path='/Users/josh.rickard/_Swimlane2/atomic-operator/redcanaryco-atomic-red-team-22dd2fb/atomics/T1016', 
            atomic_tests=[
                AtomicTest(
                    name='System Network Configuration Discovery', 
                    description='Identify network configuration information.

Upon successful execution, ...', supported_platforms=['macos', 'linux'], auto_generated_guid='c141bbdb-7fca-4254-9fd6-f47e79447e17', executor=AtomicExecutor( name='sh', command='if [ -x "$(command -v arp)" ]; then arp -a; else echo "arp is missing from ....', cleanup_command=None, elevation_required=False, steps=None ), input_arguments=None, dependency_executor_name=None, dependencies=[] ) ], hosts=[ Host( hostname='192.168.1.1', username='username', password='some_passowrd!', verify_ssl=False, ssh_key_path=None, private_key_string=None, port=22, timeout=5 ) ], supporting_files=[ 'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/top-128.txt', 'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/qakbot.bat' ] ) ]

    Returns:
        [list]: A list of modified Atomic objects that will be used to run 
                either remotely or locally.
Source code in atomic_operator/configparser.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
@property
def run_list(self):
    """Returns a list of Atomic objects that will be ran.

    This list combines Atomics and potentially filters 
    tests defined within that Atomic object based on passed
    in parameters and config_file.

    Additionally, a list of Host objects are added to their
    defined techniques or test_guids based on config and/or
    passed in parameters.

    [
        Atomic(
            attack_technique='T1016', 
            display_name='System Network Configuration Discovery', 
            path='/Users/josh.rickard/_Swimlane2/atomic-operator/redcanaryco-atomic-red-team-22dd2fb/atomics/T1016', 
            atomic_tests=[
                AtomicTest(
                    name='System Network Configuration Discovery', 
                    description='Identify network configuration information.\n\nUpon successful execution, ...', 
                    supported_platforms=['macos', 'linux'], 
                    auto_generated_guid='c141bbdb-7fca-4254-9fd6-f47e79447e17', 
                    executor=AtomicExecutor(
                        name='sh', 
                        command='if [ -x "$(command -v arp)" ]; then arp -a; else echo "arp is missing from ....', 
                        cleanup_command=None, 
                        elevation_required=False, steps=None
                    ), 
                    input_arguments=None, 
                    dependency_executor_name=None, 
                    dependencies=[]
                )
            ], 
            hosts=[
                Host(
                    hostname='192.168.1.1', 
                    username='username', 
                    password='some_passowrd!', 
                    verify_ssl=False, 
                    ssh_key_path=None, 
                    private_key_string=None, 
                    port=22, 
                    timeout=5
                )
            ],
            supporting_files=[
                'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/top-128.txt', 
                'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/qakbot.bat'
            ]
        )
    ]

    Returns:
        [list]: A list of modified Atomic objects that will be used to run 
                either remotely or locally.
    """
    __run_list = []
    if self.__config_file:
        __run_list = self.__parse_test_guids(self.__config_file)

    for item in self.__build_run_list(
        techniques=self.parse_input_lists(self.techniques) if self.techniques else [],
        test_guids=self.parse_input_lists(self.test_guids) if self.test_guids else [],
        host_list=self.__host_list,
        select_tests=self.select_tests
        ):
        __run_list.append(item)
    return __run_list

Config

The main configuration class used across atomic-operator

Raises:

Type Description
AtomicsFolderNotFound

Raised when unable to find the provided atomics_path value

Source code in atomic_operator/models.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@attr.s(frozen=True)
class Config:

    """The main configuration class used across atomic-operator

    Raises:
        AtomicsFolderNotFound: Raised when unable to find the provided atomics_path value
    """

    atomics_path          = attr.ib()
    check_prereqs         = attr.ib(default=False)
    get_prereqs           = attr.ib(default=False)
    cleanup               = attr.ib(default=False)
    command_timeout       = attr.ib(default=20)
    debug                 = attr.ib(default=False)
    prompt_for_input_args = attr.ib(default=False)
    kwargs                = attr.ib(default={})
    copy_source_files     = attr.ib(default=True)

    def __attrs_post_init__(self):
        object.__setattr__(self, 'atomics_path', self.__get_abs_path(self.atomics_path))

    def __get_abs_path(self, value):
        return os.path.abspath(os.path.expanduser(os.path.expandvars(value)))

    @atomics_path.validator
    def validate_atomics_path(self, attribute, value):
        value = self.__get_abs_path(value)
        if not os.path.exists(value):
            raise AtomicsFolderNotFound('Please provide a value for atomics_path that exists')

Atomic

A single Atomic data structure. Each Atomic (technique) will contain a list of one or more AtomicTest objects.

Source code in atomic_operator/atomic/atomic.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@attr.s
class Atomic:
    """A single Atomic data structure. Each Atomic (technique)
    will contain a list of one or more AtomicTest objects.
    """

    attack_technique                      = attr.ib()
    display_name                          = attr.ib()
    path                                  = attr.ib()
    atomic_tests: typing.List[AtomicTest] = attr.ib()
    hosts: typing.List[Host]              = attr.ib(default=None)

    def __attrs_post_init__(self):
        if self.atomic_tests:
            test_list = []
            for test in self.atomic_tests:
                test_list.append(AtomicTest(**test))
            self.atomic_tests = test_list

AtomicTest

A single Atomic test object structure

Returns:

Name Type Description
AtomicTest

A single Atomic test object

Source code in atomic_operator/atomic/atomictest.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@attr.s
class AtomicTest:
    """A single Atomic test object structure

    Returns:
        AtomicTest: A single Atomic test object
    """

    name                                        = attr.ib()
    description                                 = attr.ib()
    supported_platforms                         = attr.ib()
    auto_generated_guid                         = attr.ib()
    executor                                    = attr.ib()
    input_arguments                             = attr.ib(default=None)
    dependency_executor_name                    = attr.ib(default=None)
    dependencies: typing.List[AtomicDependency] = attr.ib(default=[])

    def __attrs_post_init__(self):
        if self.input_arguments:
            temp_list = []
            for key,val in self.input_arguments.items():
                argument_dict = {}
                argument_dict = val
                argument_dict.update({'name': key, 'value': val.get('default')})
                temp_list.append(AtomicTestInput(**argument_dict))
            self.input_arguments = temp_list
        if self.executor:
            executor_dict = self.executor
            if executor_dict.get('name') == 'manual':
                if not executor_dict.get('command'):
                    executor_dict['command'] = ''
            self.executor = AtomicExecutor(**executor_dict)
            executor_dict = None
        else:
            self.executor = []
        if self.dependencies:
            dependency_list = []
            for dependency in self.dependencies:
                dependency_list.append(AtomicDependency(**dependency))
            self.dependencies = dependency_list

Loader

Bases: Base

Source code in atomic_operator/atomic/loader.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class Loader(Base):

    __techniques = {}
    TECHNIQUE_DIRECTORY_PATTERN = 'T*'

    def __get_file_name(self, path) -> str:
        return path.name.rstrip('.yaml')

    def find_atomics(self, atomics_path, pattern='**/T*/T*.yaml') -> list:
        """Attempts to find the atomics folder within the provided atomics_path

        Args:
            atomics_path (str): A path to the atomic-red-team directory
            pattern (str, optional): Pattern used to find atomics and their required yaml files. Defaults to '**/T*/T*.yaml'.

        Returns:
            list: A list of paths of all identified atomics found in the given directory
        """
        result = []
        path = PurePath(atomics_path)
        for p in Path(path).rglob(pattern):
            result.append(p.resolve())
        return result

    def load_technique(self, path_to_dir) -> dict:
        """Loads a provided yaml file which is typically an Atomic defintiion or configuration file.

        Args:
            path_to_dir (str): A string path to a yaml formatted file

        Returns:
            dict: Returns the loaded yaml file in a dictionary format
        """
        try:
            with open(self.get_abs_path(path_to_dir), 'r', encoding="utf-8") as f:
                return yaml.safe_load(f.read())
        except:
            self.__logger.warning(f"Unable to load technique in '{path_to_dir}'")

        try:
            # windows does not like get_abs_path so casting to string
            with open(str(path_to_dir), 'r', encoding="utf-8") as f:
                return yaml.safe_load(f.read())
        except OSError as oe:
            self.__logger.warning(f"Unable to load technique in '{path_to_dir}': {oe}")

    def load_techniques(self) -> dict:
        """The main entrypoint when loading techniques from disk.

        Raises:
            AtomicsFolderNotFound: Thrown when unable to find the folder containing
                       Atomic tests

        Returns:
            dict: A dict with the key(s) as the Atomic technique ID and the val
                  is a list of Atomic objects.
        """
        atomics_path = Base.CONFIG.atomics_path
        if not os.path.exists(self.get_abs_path(atomics_path)):
            atomics_path = self.find_atomics(self.get_abs_path(__file__))
            if not atomics_path:
                raise AtomicsFolderNotFound('Unable to find any atomics folder')
        else:
            atomics_path = self.find_atomics(atomics_path)
            if not atomics_path:
                raise AtomicsFolderNotFound('Unable to find any atomics folder')

        for atomic_entry in atomics_path:
            technique = self.__get_file_name(atomic_entry)
            if not self.__techniques.get(technique):
                loaded_technique = self.load_technique(str(atomic_entry))
                if loaded_technique:
                    loaded_technique.update({'path': os.path.dirname(str(atomic_entry))})
                    self.__techniques[technique] = Atomic(**loaded_technique)
        return self.__techniques

find_atomics(atomics_path, pattern='**/T*/T*.yaml')

Attempts to find the atomics folder within the provided atomics_path

Parameters:

Name Type Description Default
atomics_path str

A path to the atomic-red-team directory

required
pattern str

Pattern used to find atomics and their required yaml files. Defaults to '/T/T.yaml'.

'**/T*/T*.yaml'

Returns:

Name Type Description
list list

A list of paths of all identified atomics found in the given directory

Source code in atomic_operator/atomic/loader.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def find_atomics(self, atomics_path, pattern='**/T*/T*.yaml') -> list:
    """Attempts to find the atomics folder within the provided atomics_path

    Args:
        atomics_path (str): A path to the atomic-red-team directory
        pattern (str, optional): Pattern used to find atomics and their required yaml files. Defaults to '**/T*/T*.yaml'.

    Returns:
        list: A list of paths of all identified atomics found in the given directory
    """
    result = []
    path = PurePath(atomics_path)
    for p in Path(path).rglob(pattern):
        result.append(p.resolve())
    return result

load_technique(path_to_dir)

Loads a provided yaml file which is typically an Atomic defintiion or configuration file.

Parameters:

Name Type Description Default
path_to_dir str

A string path to a yaml formatted file

required

Returns:

Name Type Description
dict dict

Returns the loaded yaml file in a dictionary format

Source code in atomic_operator/atomic/loader.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def load_technique(self, path_to_dir) -> dict:
    """Loads a provided yaml file which is typically an Atomic defintiion or configuration file.

    Args:
        path_to_dir (str): A string path to a yaml formatted file

    Returns:
        dict: Returns the loaded yaml file in a dictionary format
    """
    try:
        with open(self.get_abs_path(path_to_dir), 'r', encoding="utf-8") as f:
            return yaml.safe_load(f.read())
    except:
        self.__logger.warning(f"Unable to load technique in '{path_to_dir}'")

    try:
        # windows does not like get_abs_path so casting to string
        with open(str(path_to_dir), 'r', encoding="utf-8") as f:
            return yaml.safe_load(f.read())
    except OSError as oe:
        self.__logger.warning(f"Unable to load technique in '{path_to_dir}': {oe}")

load_techniques()

The main entrypoint when loading techniques from disk.

Raises:

Type Description
AtomicsFolderNotFound

Thrown when unable to find the folder containing Atomic tests

Returns:

Name Type Description
dict dict

A dict with the key(s) as the Atomic technique ID and the val is a list of Atomic objects.

Source code in atomic_operator/atomic/loader.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def load_techniques(self) -> dict:
    """The main entrypoint when loading techniques from disk.

    Raises:
        AtomicsFolderNotFound: Thrown when unable to find the folder containing
                   Atomic tests

    Returns:
        dict: A dict with the key(s) as the Atomic technique ID and the val
              is a list of Atomic objects.
    """
    atomics_path = Base.CONFIG.atomics_path
    if not os.path.exists(self.get_abs_path(atomics_path)):
        atomics_path = self.find_atomics(self.get_abs_path(__file__))
        if not atomics_path:
            raise AtomicsFolderNotFound('Unable to find any atomics folder')
    else:
        atomics_path = self.find_atomics(atomics_path)
        if not atomics_path:
            raise AtomicsFolderNotFound('Unable to find any atomics folder')

    for atomic_entry in atomics_path:
        technique = self.__get_file_name(atomic_entry)
        if not self.__techniques.get(technique):
            loaded_technique = self.load_technique(str(atomic_entry))
            if loaded_technique:
                loaded_technique.update({'path': os.path.dirname(str(atomic_entry))})
                self.__techniques[technique] = Atomic(**loaded_technique)
    return self.__techniques