Skip to content

Atomic Operator

AtomicOperator (Base)

Main class used to run Atomic Red Team tests.

atomic-operator is used to run Atomic Red Team tests both locally and remotely. These tests (atomics) are predefined tests to mock or emulate a specific technique.

config_file definition: atomic-operator's run method can be supplied with a path to a configuration file (config_file) which defines specific tests and/or values for input parameters to facilitate automation of said tests. An example of this config_file can be seen below:

        inventory:
          linux1:
            executor: ssh
            authentication:
              username: root
              password: UR4Swimlane!
              #ssk_key_path:
              port: 22
              timeout: 5
            hosts:
              # - 192.168.1.1
              - 10.32.100.199
              # etc.
        atomic_tests:
          - guid: f7e6ec05-c19e-4a80-a7e7-241027992fdb
            input_arguments:
              output_file:
                value: custom_output.txt
              input_file:
                value: custom_input.txt
          - guid: 3ff64f0b-3af2-3866-339d-38d9791407c3
            input_arguments:
              second_arg:
                value: SWAPPPED argument
          - guid: 32f90516-4bc9-43bd-b18d-2cbe0b7ca9b2
            inventories:
              - linux1

Exceptions:

Type Description
ValueError

If a provided technique is unknown we raise an error.

get_atomics(self, desintation='/home/runner/work/atomic-operator/atomic-operator', **kwargs)

Downloads the RedCanary atomic-red-team repository to your local system.

Parameters:

Name Type Description Default
desintation str

A folder path to download the repositorty data to. Defaults to os.getcwd().

'/home/runner/work/atomic-operator/atomic-operator'
kwargs dict

This kwargs will be passed along to Python requests library during download. Defaults to None.

{}

Returns:

Type Description
str

The path the data can be found at.

Source code in atomic_operator/atomic_operator.py
def get_atomics(self, desintation=os.getcwd(), **kwargs):
    """Downloads the RedCanary atomic-red-team repository to your local system.

    Args:
        desintation (str, optional): A folder path to download the repositorty data to. Defaults to os.getcwd().
        kwargs (dict, optional): This kwargs will be passed along to Python requests library during download. Defaults to None.

    Returns:
        str: The path the data can be found at.
    """
    if not os.path.exists(desintation):
        os.makedirs(desintation)
    desintation = kwargs.pop('destination') if kwargs.get('destination') else desintation
    folder_name = self.download_atomic_red_team_repo(
        save_path=desintation, 
        **kwargs
    )
    return os.path.join(desintation, folder_name)

run(self, techniques=['all'], test_guids=[], select_tests=False, atomics_path='/home/runner/work/atomic-operator/atomic-operator', check_prereqs=False, get_prereqs=False, cleanup=False, copy_source_files=True, command_timeout=20, debug=False, prompt_for_input_args=False, return_atomics=False, config_file=None, config_file_only=False, hosts=[], username=None, password=None, ssh_key_path=None, private_key_string=None, verify_ssl=False, ssh_port=22, ssh_timeout=5, *args, **kwargs)

The main method in which we run Atomic Red Team tests.

Parameters:

Name Type Description Default
techniques list

One or more defined techniques by attack_technique ID. Defaults to 'all'.

['all']
test_guids list

One or more Atomic test GUIDs. Defaults to None.

[]
select_tests bool

Select one or more tests from provided techniques. Defaults to False.

False
atomics_path str

The path of Atomic tests. Defaults to os.getcwd().

'/home/runner/work/atomic-operator/atomic-operator'
check_prereqs bool

Whether or not to check for prereq dependencies (prereq_comand). Defaults to False.

False
get_prereqs bool

Whether or not you want to retrieve prerequisites. Defaults to False.

False
cleanup bool

Whether or not you want to run cleanup command(s). Defaults to False.

False
copy_source_files bool

Whether or not you want to copy any related source (src, bin, etc.) files to a remote host. Defaults to True.

True
command_timeout int

Timeout duration for each command. Defaults to 20.

20
debug bool

Whether or not you want to output details about tests being ran. Defaults to False.

False
prompt_for_input_args bool

Whether you want to prompt for input arguments for each test. Defaults to False.

False
return_atomics bool

Whether or not you want to return atomics instead of running them. Defaults to False.

False
config_file str

A path to a conifg_file which is used to automate atomic-operator in environments. Default to None.

None
config_file_only bool

Whether or not you want to run tests based on the provided config_file only. Defaults to False.

False
hosts list

A list of one or more remote hosts to run a test on. Defaults to [].

[]
username str

Username for authentication of remote connections. Defaults to None.

None
password str

Password for authentication of remote connections. Defaults to None.

None
ssh_key_path str

Path to a SSH Key for authentication of remote connections. Defaults to None.

None
private_key_string str

A private SSH Key string used for authentication of remote connections. Defaults to None.

None
verify_ssl bool

Whether or not to verify ssl when connecting over RDP (windows). Defaults to False.

False
ssh_port int

SSH port for authentication of remote connections. Defaults to 22.

22
ssh_timeout int

SSH timeout for authentication of remote connections. Defaults to 5.

5
kwargs dict

If provided, keys matching inputs for a test will be replaced. Default is None.

{}

Exceptions:

Type Description
ValueError

If a provided technique is unknown we raise an error.

Source code in atomic_operator/atomic_operator.py
def run(self, techniques: list=['all'], test_guids: list=[], select_tests=False,
              atomics_path=os.getcwd(), check_prereqs=False, get_prereqs=False, 
              cleanup=False, copy_source_files=True,command_timeout=20, debug=False, 
              prompt_for_input_args=False, return_atomics=False, config_file=None, 
              config_file_only=False, hosts=[], username=None, password=None, 
              ssh_key_path=None, private_key_string=None, verify_ssl=False, 
              ssh_port=22, ssh_timeout=5, *args, **kwargs) -> None:
    """The main method in which we run Atomic Red Team tests.

    Args:
        techniques (list, optional): One or more defined techniques by attack_technique ID. Defaults to 'all'.
        test_guids (list, optional): One or more Atomic test GUIDs. Defaults to None.
        select_tests (bool, optional): Select one or more tests from provided techniques. Defaults to False.
        atomics_path (str, optional): The path of Atomic tests. Defaults to os.getcwd().
        check_prereqs (bool, optional): Whether or not to check for prereq dependencies (prereq_comand). Defaults to False.
        get_prereqs (bool, optional): Whether or not you want to retrieve prerequisites. Defaults to False.
        cleanup (bool, optional): Whether or not you want to run cleanup command(s). Defaults to False.
        copy_source_files (bool, optional): Whether or not you want to copy any related source (src, bin, etc.) files to a remote host. Defaults to True.
        command_timeout (int, optional): Timeout duration for each command. Defaults to 20.
        debug (bool, optional): Whether or not you want to output details about tests being ran. Defaults to False.
        prompt_for_input_args (bool, optional): Whether you want to prompt for input arguments for each test. Defaults to False.
        return_atomics (bool, optional): Whether or not you want to return atomics instead of running them. Defaults to False.
        config_file (str, optional): A path to a conifg_file which is used to automate atomic-operator in environments. Default to None.
        config_file_only (bool, optional): Whether or not you want to run tests based on the provided config_file only. Defaults to False.
        hosts (list, optional): A list of one or more remote hosts to run a test on. Defaults to [].
        username (str, optional): Username for authentication of remote connections. Defaults to None.
        password (str, optional): Password for authentication of remote connections. Defaults to None.
        ssh_key_path (str, optional): Path to a SSH Key for authentication of remote connections. Defaults to None.
        private_key_string (str, optional): A private SSH Key string used for authentication of remote connections. Defaults to None.
        verify_ssl (bool, optional): Whether or not to verify ssl when connecting over RDP (windows). Defaults to False.
        ssh_port (int, optional): SSH port for authentication of remote connections. Defaults to 22.
        ssh_timeout (int, optional): SSH timeout for authentication of remote connections. Defaults to 5.
        kwargs (dict, optional): If provided, keys matching inputs for a test will be replaced. Default is None.

    Raises:
        ValueError: If a provided technique is unknown we raise an error.
    """
    response = self.__check_arguments(kwargs, self.run)
    if response:
        return response
    if kwargs.get('help'):
        return self.help(method='run')
    if debug:
        import logging
        logging.getLogger().setLevel(logging.DEBUG)
    count = 0
    if check_prereqs:
        count += 1
    if get_prereqs:
        count += 1
    if cleanup:
        count += 1
    if count > 1:
        return IncorrectParameters(f"You have passed in incompatible arguments. Please only provide one of 'check_prereqs','get_prereqs','cleanup'.")
    atomics_path = self.__find_path(atomics_path)
    if not atomics_path:
        return AtomicsFolderNotFound('Unable to find a folder containing Atomics. Please provide a path or run get_atomics.')
    Base.CONFIG = Config(
        atomics_path          = atomics_path,
        check_prereqs         = check_prereqs,
        get_prereqs           = get_prereqs,
        cleanup               = cleanup,
        command_timeout       = command_timeout,
        debug                 = debug,
        prompt_for_input_args = prompt_for_input_args,
        kwargs                = kwargs,
        copy_source_files     = copy_source_files
    )
    # taking inputs from both config_file and passed in values via command
    # line to build a run_list of objects
    self.__config_parser = ConfigParser(
            config_file=config_file,
            techniques=None if config_file_only else self.parse_input_lists(techniques),
            test_guids=None if config_file_only else self.parse_input_lists(test_guids),
            host_list=None if config_file_only else self.parse_input_lists(hosts),
            username=username,
            password=password,
            ssh_key_path=ssh_key_path,
            private_key_string=private_key_string,
            verify_ssl=verify_ssl,
            ssh_port=ssh_port,
            ssh_timeout=ssh_timeout,
            select_tests=select_tests
        )
    self.__run_list = self.__config_parser.run_list

    __return_atomics = []
    for item in self.__run_list:
        if return_atomics:
            __return_atomics.append(item)
        elif kwargs.get('kwargs'):
            self.__run_technique(item, **kwargs.get('kwargs'))
        else:
            self.__run_technique(item)
    if return_atomics and __return_atomics:
        return __return_atomics
    return self.__test_responses

Base

download_atomic_red_team_repo(self, save_path, **kwargs)

Downloads the Atomic Red Team repository from github

Parameters:

Name Type Description Default
save_path str

The path to save the downloaded and extracted ZIP contents

required

Returns:

Type Description
str

A string of the location the data was saved to.

Source code in atomic_operator/base.py
def download_atomic_red_team_repo(self, save_path, **kwargs) -> str:
    """Downloads the Atomic Red Team repository from github

    Args:
        save_path (str): The path to save the downloaded and extracted ZIP contents

    Returns:
        str: A string of the location the data was saved to.
    """
    response = requests.get(Base.ATOMIC_RED_TEAM_REPO, stream=True, **kwargs)
    z = zipfile.ZipFile(BytesIO(response.content))
    with zipfile.ZipFile(BytesIO(response.content)) as zf:
        for member in zf.infolist():
            file_path = os.path.realpath(os.path.join(save_path, member.filename))
            if file_path.startswith(os.path.realpath(save_path)):
                zf.extract(member, save_path)
    return z.namelist()[0]

get_abs_path(self, value)

Formats and returns the absolute path for a path value

Parameters:

Name Type Description Default
value str

A path string in many different accepted formats

required

Returns:

Type Description
str

The absolute path of the provided string

Source code in atomic_operator/base.py
def get_abs_path(self, value) -> str:
    """Formats and returns the absolute path for a path value

    Args:
        value (str): A path string in many different accepted formats

    Returns:
        str: The absolute path of the provided string
    """
    return os.path.abspath(os.path.expanduser(os.path.expandvars(value)))

get_local_system_platform(self)

Identifies the local systems operating system platform

Returns:

Type Description
str

The current/local systems operating system platform

Source code in atomic_operator/base.py
def get_local_system_platform(self) -> str:
    """Identifies the local systems operating system platform

    Returns:
        str: The current/local systems operating system platform
    """
    os_name = platform.system().lower()
    if os_name == "darwin":
        return "macos"
    return os_name

prompt_user_for_input(self, title, input_object)

Prompts user for input values based on the provided values.

Source code in atomic_operator/base.py
    def prompt_user_for_input(self, title, input_object):
        """Prompts user for input values based on the provided values.
        """
        print(f"""
Inputs for {title}:
    Input Name: {input_object.name}
    Default:     {input_object.default}
    Description: {input_object.description}
""")
        print(f"Please provide a value for {input_object.name} (If blank, default is used):",)
        value = sys.stdin.readline()
        if bool(value):
            return value
        return input_object.default

ConfigParser (Base)

config property readonly

Returns raw converted config_file passed into class

Returns:

Type Description
[dict]

Returns the converted config_file as dictionary.

run_list property readonly

Returns a list of Atomic objects that will be ran.

    This list combines Atomics and potentially filters 
    tests defined within that Atomic object based on passed
    in parameters and config_file.

    Additionally, a list of Host objects are added to their
    defined techniques or test_guids based on config and/or
    passed in parameters.

    [
        Atomic(
            attack_technique='T1016', 
            display_name='System Network Configuration Discovery', 
            path='/Users/josh.rickard/_Swimlane2/atomic-operator/redcanaryco-atomic-red-team-22dd2fb/atomics/T1016', 
            atomic_tests=[
                AtomicTest(
                    name='System Network Configuration Discovery', 
                    description='Identify network configuration information.

Upon successful execution, ...', supported_platforms=['macos', 'linux'], auto_generated_guid='c141bbdb-7fca-4254-9fd6-f47e79447e17', executor=AtomicExecutor( name='sh', command='if [ -x "$(command -v arp)" ]; then arp -a; else echo "arp is missing from ....', cleanup_command=None, elevation_required=False, steps=None ), input_arguments=None, dependency_executor_name=None, dependencies=[] ) ], hosts=[ Host( hostname='192.168.1.1', username='username', password='some_passowrd!', verify_ssl=False, ssh_key_path=None, private_key_string=None, port=22, timeout=5 ) ], supporting_files=[ 'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/top-128.txt', 'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/qakbot.bat' ] ) ]

    !!! returns
        [list]: A list of modified Atomic objects that will be used to run 
                either remotely or locally.

__init__(self, config_file=None, techniques=None, test_guids=None, host_list=None, username=None, password=None, ssh_key_path=None, private_key_string=None, verify_ssl=False, ssh_port=22, ssh_timeout=5, select_tests=False) special

Parses a provided config file as well as parameters to build a run list

    This list combines Atomics and potentially filters 
    tests defined within that Atomic object based on passed
    in parameters and config_file.

    Additionally, a list of Host objects are added to their
    defined techniques or test_guids based on config and/or
    passed in parameters.

    Example: Example structure returned from provided values
    [
        Atomic(
            attack_technique='T1016', 
            display_name='System Network Configuration Discovery', 
            path='/Users/josh.rickard/_Swimlane2/atomic-operator/redcanaryco-atomic-red-team-22dd2fb/atomics/T1016', 
            atomic_tests=[
                AtomicTest(
                    name='System Network Configuration Discovery', 
                    description='Identify network configuration information.

Upon successful execution, ...', supported_platforms=['macos', 'linux'], auto_generated_guid='c141bbdb-7fca-4254-9fd6-f47e79447e17', executor=AtomicExecutor( name='sh', command='if [ -x "$(command -v arp)" ]; then arp -a; else echo "arp is missing from ....', cleanup_command=None, elevation_required=False, steps=None ), input_arguments=None, dependency_executor_name=None, dependencies=[] ) ], hosts=[ Host( hostname='192.168.1.1', username='username', password='some_passowrd!', verify_ssl=False, ssh_key_path=None, private_key_string=None, port=22, timeout=5 ) ], supporting_files=[ 'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/top-128.txt', 'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/qakbot.bat' ] ) ]

Source code in atomic_operator/configparser.py
def __init__(self, config_file=None, techniques=None, test_guids=None, 
                   host_list=None, username=None, password=None,
                   ssh_key_path=None, private_key_string=None, verify_ssl=False,
                   ssh_port=22, ssh_timeout=5, select_tests=False
            ):
    """Parses a provided config file as well as parameters to build a run list

    This list combines Atomics and potentially filters 
    tests defined within that Atomic object based on passed
    in parameters and config_file.

    Additionally, a list of Host objects are added to their
    defined techniques or test_guids based on config and/or
    passed in parameters.

    Example: Example structure returned from provided values
    [
        Atomic(
            attack_technique='T1016', 
            display_name='System Network Configuration Discovery', 
            path='/Users/josh.rickard/_Swimlane2/atomic-operator/redcanaryco-atomic-red-team-22dd2fb/atomics/T1016', 
            atomic_tests=[
                AtomicTest(
                    name='System Network Configuration Discovery', 
                    description='Identify network configuration information.\n\nUpon successful execution, ...', 
                    supported_platforms=['macos', 'linux'], 
                    auto_generated_guid='c141bbdb-7fca-4254-9fd6-f47e79447e17', 
                    executor=AtomicExecutor(
                        name='sh', 
                        command='if [ -x "$(command -v arp)" ]; then arp -a; else echo "arp is missing from ....', 
                        cleanup_command=None, 
                        elevation_required=False, steps=None
                    ), 
                    input_arguments=None, 
                    dependency_executor_name=None, 
                    dependencies=[]
                )
            ], 
            hosts=[
                Host(
                    hostname='192.168.1.1', 
                    username='username', 
                    password='some_passowrd!', 
                    verify_ssl=False, 
                    ssh_key_path=None, 
                    private_key_string=None, 
                    port=22, 
                    timeout=5
                )
            ],
            supporting_files=[
                'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/top-128.txt', 
                'redcanaryco-atomic-red-team-22dd2fb/atomics/T1016/src/qakbot.bat'
            ]
        )
    ]
    """
    self.__config_file = self.__load_config(config_file)
    self.techniques = techniques
    self.test_guids = test_guids
    self.select_tests = select_tests
    self.__host_list = []
    if host_list:
        for host in self.parse_input_lists(host_list):
            self.__host_list.append(self.__create_remote_host_object(
                hostname=host,
                username=username,
                password=password,
                ssh_key_path=ssh_key_path,
                private_key_string=private_key_string,
                verify_ssl=verify_ssl,
                ssh_port=ssh_port,
                ssh_timeout=ssh_timeout
            ))

get_inputs(self, guid)

Retrieves any defined inputs for a given atomic test GUID

Parameters:

Name Type Description Default
guid str

An Atomic test GUID

required

Returns:

Type Description
dict

A dictionary of defined input arguments or empty

Source code in atomic_operator/configparser.py
def get_inputs(self, guid: str): 
    """Retrieves any defined inputs for a given atomic test GUID

    Args:
        guid (str): An Atomic test GUID

    Returns:
        dict: A dictionary of defined input arguments or empty
    """
    if self.__config_file:
        for item in self.__config_file['atomic_tests']:
            if item['guid'] == guid:
                return item.get('input_arguments', {})
    return {}

is_defined(self, guid)

Checks to see if a GUID is defined within a config file

Parameters:

Name Type Description Default
guid str

The GUID defined within a parsed config file

required

Returns:

Type Description
[bool]

Returns True if GUID is defined within parsed config_file

Source code in atomic_operator/configparser.py
def is_defined(self, guid: str):
    """Checks to see if a GUID is defined within a config file

    Args:
        guid (str): The GUID defined within a parsed config file

    Returns:
        [bool]: Returns True if GUID is defined within parsed config_file
    """
    if self.__config_file:
        for item in self.__config_file['atomic_tests']:
            if item['guid'] == guid:
                return True
    return False

Config

The main configuration class used across atomic-operator

Exceptions:

Type Description
AtomicsFolderNotFound

Raised when unable to find the provided atomics_path value

__init__(self, atomics_path, check_prereqs=False, get_prereqs=False, cleanup=False, command_timeout=20, debug=False, prompt_for_input_args=False, kwargs={}, copy_source_files=True) special

Method generated by attrs for class Config.

Source code in atomic_operator/models.py
def __init__(self, atomics_path, check_prereqs=attr_dict['check_prereqs'].default, get_prereqs=attr_dict['get_prereqs'].default, cleanup=attr_dict['cleanup'].default, command_timeout=attr_dict['command_timeout'].default, debug=attr_dict['debug'].default, prompt_for_input_args=attr_dict['prompt_for_input_args'].default, kwargs=attr_dict['kwargs'].default, copy_source_files=attr_dict['copy_source_files'].default):
    _setattr = _cached_setattr.__get__(self, self.__class__)
    _inst_dict = self.__dict__
    _inst_dict['atomics_path'] = atomics_path
    _inst_dict['check_prereqs'] = check_prereqs
    _inst_dict['get_prereqs'] = get_prereqs
    _inst_dict['cleanup'] = cleanup
    _inst_dict['command_timeout'] = command_timeout
    _inst_dict['debug'] = debug
    _inst_dict['prompt_for_input_args'] = prompt_for_input_args
    _inst_dict['kwargs'] = kwargs
    _inst_dict['copy_source_files'] = copy_source_files
    if _config._run_validators is True:
        __attr_validator_atomics_path(self, __attr_atomics_path, self.atomics_path)
    self.__attrs_post_init__()

Host

__init__(self, hostname, username=None, password=None, verify_ssl=False, ssh_key_path=None, private_key_string=None, port=22, timeout=5) special

Method generated by attrs for class Host.

Source code in atomic_operator/models.py
def __init__(self, hostname, username=attr_dict['username'].default, password=attr_dict['password'].default, verify_ssl=attr_dict['verify_ssl'].default, ssh_key_path=attr_dict['ssh_key_path'].default, private_key_string=attr_dict['private_key_string'].default, port=attr_dict['port'].default, timeout=attr_dict['timeout'].default):
    self.hostname = hostname
    self.username = username
    self.password = password
    self.verify_ssl = verify_ssl
    self.ssh_key_path = ssh_key_path
    self.private_key_string = private_key_string
    self.port = port
    self.timeout = timeout
    if _config._run_validators is True:
        __attr_validator_ssh_key_path(self, __attr_ssh_key_path, self.ssh_key_path)

Atomic

A single Atomic data structure. Each Atomic (technique) will contain a list of one or more AtomicTest objects.

__init__(self, attack_technique, display_name, path, atomic_tests, hosts=None) special

Method generated by attrs for class Atomic.

Source code in atomic_operator/atomic/atomic.py
def __init__(self, attack_technique, display_name, path, atomic_tests, hosts=attr_dict['hosts'].default):
    self.attack_technique = attack_technique
    self.display_name = display_name
    self.path = path
    self.atomic_tests = atomic_tests
    self.hosts = hosts
    self.__attrs_post_init__()

AtomicDependency

__init__(self, description, get_prereq_command=None, prereq_command=None) special

Method generated by attrs for class AtomicDependency.

Source code in atomic_operator/atomic/atomictest.py
def __init__(self, description, get_prereq_command=attr_dict['get_prereq_command'].default, prereq_command=attr_dict['prereq_command'].default):
    self.description = description
    self.get_prereq_command = get_prereq_command
    self.prereq_command = prereq_command

AtomicExecutor

__init__(self, name, command, cleanup_command=None, elevation_required=False, steps=None) special

Method generated by attrs for class AtomicExecutor.

Source code in atomic_operator/atomic/atomictest.py
def __init__(self, name, command, cleanup_command=attr_dict['cleanup_command'].default, elevation_required=attr_dict['elevation_required'].default, steps=attr_dict['steps'].default):
    self.name = name
    self.command = command
    self.cleanup_command = cleanup_command
    self.elevation_required = elevation_required
    self.steps = steps

AtomicTest

A single Atomic test object structure

Returns:

Type Description
AtomicTest

A single Atomic test object

__init__(self, name, description, supported_platforms, auto_generated_guid, executor, input_arguments=None, dependency_executor_name=None, dependencies=[]) special

Method generated by attrs for class AtomicTest.

Source code in atomic_operator/atomic/atomictest.py
def __init__(self, name, description, supported_platforms, auto_generated_guid, executor, input_arguments=attr_dict['input_arguments'].default, dependency_executor_name=attr_dict['dependency_executor_name'].default, dependencies=attr_dict['dependencies'].default):
    self.name = name
    self.description = description
    self.supported_platforms = supported_platforms
    self.auto_generated_guid = auto_generated_guid
    self.executor = executor
    self.input_arguments = input_arguments
    self.dependency_executor_name = dependency_executor_name
    self.dependencies = dependencies
    self.__attrs_post_init__()

AtomicTestInput

__init__(self, name, description, type, default, value=None, source=None, destination=None) special

Method generated by attrs for class AtomicTestInput.

Source code in atomic_operator/atomic/atomictest.py
def __init__(self, name, description, type, default, value=attr_dict['value'].default, source=attr_dict['source'].default, destination=attr_dict['destination'].default):
    self.name = name
    self.description = description
    self.type = type
    self.default = default
    self.value = value
    self.source = source
    self.destination = destination

Loader (Base)

find_atomics(self, atomics_path, pattern='**/T*/T*.yaml')

Attempts to find the atomics folder within the provided atomics_path

Parameters:

Name Type Description Default
atomics_path str

A path to the atomic-red-team directory

required
pattern str

Pattern used to find atomics and their required yaml files. Defaults to '/T/T.yaml'.

'**/T*/T*.yaml'

Returns:

Type Description
list

A list of paths of all identified atomics found in the given directory

Source code in atomic_operator/atomic/loader.py
def find_atomics(self, atomics_path, pattern='**/T*/T*.yaml') -> list:
    """Attempts to find the atomics folder within the provided atomics_path

    Args:
        atomics_path (str): A path to the atomic-red-team directory
        pattern (str, optional): Pattern used to find atomics and their required yaml files. Defaults to '**/T*/T*.yaml'.

    Returns:
        list: A list of paths of all identified atomics found in the given directory
    """
    result = []
    path = PurePath(atomics_path)
    for p in Path(path).rglob(pattern):
        result.append(p.resolve())
    return result

load_technique(self, path_to_dir)

Loads a provided yaml file which is typically an Atomic defintiion or configuration file.

Parameters:

Name Type Description Default
path_to_dir str

A string path to a yaml formatted file

required

Returns:

Type Description
dict

Returns the loaded yaml file in a dictionary format

Source code in atomic_operator/atomic/loader.py
def load_technique(self, path_to_dir) -> dict:
    """Loads a provided yaml file which is typically an Atomic defintiion or configuration file.

    Args:
        path_to_dir (str): A string path to a yaml formatted file

    Returns:
        dict: Returns the loaded yaml file in a dictionary format
    """
    try:
        with open(self.get_abs_path(path_to_dir), 'r', encoding="utf-8") as f:
            return yaml.safe_load(f.read())
    except:
        # windows does not like get_abs_path so casting to string
        with open(str(path_to_dir), 'r', encoding="utf-8") as f:
            return yaml.safe_load(f.read())

load_techniques(self)

The main entrypoint when loading techniques from disk.

Exceptions:

Type Description
AtomicsFolderNotFound

Thrown when unable to find the folder containing Atomic tests

Returns:

Type Description
dict

A dict with the key(s) as the Atomic technique ID and the val is a list of Atomic objects.

Source code in atomic_operator/atomic/loader.py
def load_techniques(self) -> dict:
    """The main entrypoint when loading techniques from disk.

    Raises:
        AtomicsFolderNotFound: Thrown when unable to find the folder containing
                   Atomic tests

    Returns:
        dict: A dict with the key(s) as the Atomic technique ID and the val
              is a list of Atomic objects.
    """
    atomics_path = Base.CONFIG.atomics_path
    if not os.path.exists(self.get_abs_path(atomics_path)):
        atomics_path = self.find_atomics(self.get_abs_path(__file__))
        if not atomics_path:
            raise AtomicsFolderNotFound('Unable to find any atomics folder')
    else:
        atomics_path = self.find_atomics(atomics_path)
        if not atomics_path:
            raise AtomicsFolderNotFound('Unable to find any atomics folder')

    for atomic_entry in atomics_path:
        technique = self.__get_file_name(atomic_entry)
        if not self.__techniques.get(technique):
            loaded_technique = self.load_technique(str(atomic_entry))
            loaded_technique.update({'path': os.path.dirname(str(atomic_entry))})
            self.__techniques[technique] = Atomic(**loaded_technique)
    return self.__techniques