Documentation

# Examples of Production ready Connectors

# S3 Download Connector

1.File - schema.json
2.File - Pyproject.toml
3.File - app.py
4.File - models.py

# 1. File - schema.json

{
"id": "connectors.s3_download",
"name": "S3 Download",
"version": 1,
"tags": [
    "connectors",
    "storage",
    "download_files"
],
"features": [
    {
    "download_files": {
        "input_schema": "download_files_request_schema",
        "output_schema": "download_files_response_schema",
        "configuration": null,
        "exceptions": [
        "connectors.s3_connector.S3FileDownloadError"
        ],
        "displayname": "Read from S3",
        "hidden": false
    }
    }
],
"configuration": {},
"exceptions": [
    "connectors.s3_connector.InvalidConfigurationError",
    "connectors.s3_connector.ConfigurationLoadError"
],
"schema": [
    {
    "download_files_request_schema": {
        "entities": [
        {
            "connection": {
            "datatype": "connection",
            "mandatory": true,
            "hidden": false,
            "displayname": "Connection",
            "properties": {
                "label": "Connection",
                "options": [],
                "placeholder_text": "Select Connection",
                "component": "connectionSelect",
                "help_text": "Choose an S3 connection from the available list.",
                "parameter": true,
                "value_key": "connection",
                "display_order": 1,
                "option_label_key": "name",
                "option_value_key": "connection_id",
                "provider_name": "Amazon Web Service"
            }
            }
        },
        {
            "bucket_name": {
            "datatype": "string",
            "mandatory": true,
            "hidden": false,
            "displayname": "Bucket Name",
            "properties": {
                "label": "Bucket Name",
                "component": "input",
                "help_text": "Enter the name of the S3 bucket to download files from.",
                "parameter": true,
                "value_key": "bucket_name",
                "placeholder": "Provide Bucket Name",
                "display_order": 2
            }
            }
        },
        {
            "path": {
            "datatype": "string",
            "mandatory": true,
            "hidden": false,
            "displayname": "Path",
            "properties": {
                "label": "Path",
                "component": "input",
                "help_text": "Specify the path within the bucket to locate the files.",
                "parameter": true,
                "value_key": "path",
                "placeholder": "Provide Path",
                "display_order": 3
            }
            }
        }
        ]
    }
    },
    {
    "download_files_response_schema": {
        "entities": [
        {
            "files": {
            "datatype": "file",
            "displayname": "Files",
            "mandatory": true,
            "hidden": false,
            "values": [
                "png",
                "jpg",
                "jpeg",
                "pdf",
                "tif",
                "tiff"
            ],
            "is_array": true,
            "min_count": 1,
            "max_count": -1,
            "entities": {
                "file_name": {
                "datatype": "string",
                "mandatory": true,
                "hidden": false
                },
                "file_ref_id": {
                "datatype": "string",
                "mandatory": true,
                "hidden": false
                },
                "file_id": {
                "datatype": "string",
                "mandatory": false,
                "hidden": false
                },
                "mime_type": {
                "datatype": "string",
                "mandatory": false,
                "hidden": false
                },
                "total_pages": {
                "datatype": "number",
                "mandatory": true,
                "hidden": false
                }
            }
            }
        }
        ]
    }
    }
],
"deployment": {
    "type": "container",
    "shared": true,
    "properties": {
    "nodeAffinity": "",
    "autoScale": true
    },
    "workers": 2,
    "resources": {
    "cpu": 2,
    "mem": 2
    }
}
}

# 2. File - pyproject.toml

    [project]
            name = "read-from-s3-connector"
            version = "0.0.1"
            description = "Project built using the Purple Fabric Connectors Python SDK for integration with the Purple Fabric Platform."
            readme = "README.md"
            requires-python = ">=3.12"
            dependencies = [ "pf-connectors-py-sdk==0.1.111", "boto3",]

            [tool.pf-connectors-py-sdk]
            asset_name = "S3 Download"
            asset_category = "INTEGRATOR"
            asset_description = "Retrieve files from an Amazon S3 bucket."

# 3. File - app.py

    from typing import List
    import os
    import boto3
    from botocore.exceptions import ClientError
    from generated.models import DownloadFilesInputs, DownloadFilesOutputs
    from pf_connectors_py_sdk import Context
    from pf_connectors_py_sdk.core.models import FileReference

    from exceptions import (
        AWSAccessException,
        InvalidBucketNameException,
        InvalidPathException,
        FileDownloadException,
        FileProcessingException,
        InvalidAuthenticationException
    )


    def list_s3_objects(s3_client, bucket: str, prefix: str, recursive: bool = False) -> List[str]:
        """List objects in S3 bucket with given prefix. Can handle both file and directory paths."""
        try:
            # Make a single list_objects_v2 call with appropriate parameters
            params = {
                'Bucket': bucket,
                # Remove trailing slash to handle both file and directory cases
                'Prefix': prefix.rstrip('/'),
            }

            # First try without delimiter to check if there are any files
            response = s3_client.list_objects_v2(**params)

            # If no files found with direct prefix, try with trailing slash
            if 'Contents' not in response:
                params['Prefix'] = f"{prefix.rstrip('/')}/"
                response = s3_client.list_objects_v2(**params)

            # If still no files and not recursive, try with delimiter
            if not recursive and 'Contents' not in response:
                params['Delimiter'] = '/'
                response = s3_client.list_objects_v2(**params)

            files = []

            # Process all contents (files) in the response
            for obj in response.get('Contents', []):
                key = obj['Key']
                if not key.endswith('/'):  # Skip directory markers
                    files.append(key)

            # If we're not in recursive mode and found no direct files,
            # but have CommonPrefixes, switch to recursive mode for this case
            if not recursive and not files and 'CommonPrefixes' in response:
                # Try one more time with recursive listing
                return list_s3_objects(s3_client, bucket, prefix, recursive=True)

            return sorted(files)  # Return sorted list for consistent results

        except ClientError as e:
            if e.response['Error']['Code'] == 'NoSuchBucket':
                raise InvalidBucketNameException(
                    f"Bucket '{bucket}' does not exist")
            raise AWSAccessException(f"Failed to list objects: {str(e)}")


    def run_download_files(inputs: DownloadFilesInputs, context: Context) -> DownloadFilesOutputs:
        # Validate authentication configuration
        if not inputs.connection.authentication:
            raise InvalidAuthenticationException(
                "Authentication type must be specified in the connection configuration")

        # Initialize S3 client with appropriate authentication
        try:
            if inputs.connection.authentication == "IAM ROLE":
                # Use IAM role-based authentication
                if not inputs.connection.IAM_role_name:
                    raise InvalidAuthenticationException(
                        "IAM role name must be provided for IAM Role authentication")

                # First, assume the outbound role in the current account
                outbound_role_arn = os.environ.get('AWS_OUTBOUND_IAM_ROLE')
                if not outbound_role_arn:
                    raise InvalidAuthenticationException(
                        "AWS_OUTBOUND_IAM_ROLE environment variable must be set for cross-account access")

                sts_client = boto3.client('sts')

                # Step 1: Assume the outbound role
                outbound_role = sts_client.assume_role(
                    RoleArn=outbound_role_arn,
                    RoleSessionName="S3ConnectorOutboundSession"
                )

                # Create STS client with outbound role credentials
                sts_client_outbound = boto3.client(
                    'sts',
                    aws_access_key_id=outbound_role['Credentials']['AccessKeyId'],
                    aws_secret_access_key=outbound_role['Credentials']['SecretAccessKey'],
                    aws_session_token=outbound_role['Credentials']['SessionToken'],
                    region_name=inputs.connection.region
                )

                # Step 2: Use the outbound role to assume the destination role
                destination_role = sts_client_outbound.assume_role(
                    RoleArn=inputs.connection.IAM_role_name,
                    RoleSessionName="S3ConnectorDestinationSession"
                )

                # Get final credentials from destination role
                credentials = destination_role['Credentials']
                s3_client = boto3.client(
                    's3',
                    aws_access_key_id=credentials['AccessKeyId'],
                    aws_secret_access_key=credentials['SecretAccessKey'],
                    aws_session_token=credentials['SessionToken'],
                    region_name=inputs.connection.region
                )
            elif inputs.connection.authentication == "Secrets":
                # Use access key based authentication
                if not inputs.connection.access_key_id or not inputs.connection.secret_access_key:
                    raise InvalidAuthenticationException(
                        "Access key ID and secret access key must be provided for Secrets authentication")

                s3_client = boto3.client(
                    's3',
                    aws_access_key_id=inputs.connection.access_key_id,
                    aws_secret_access_key=inputs.connection.secret_access_key,
                    region_name=inputs.connection.region
                )
            else:
                raise InvalidAuthenticationException(
                    f"Invalid authentication type: {inputs.connection.authentication}. Must be either 'IAM Role' or 'Secrets'")

        except ClientError as e:
            raise AWSAccessException(f"Failed to initialize S3 client: {str(e)}")
        except Exception as e:
            raise AWSAccessException(f"Failed to initialize S3 client: {str(e)}")

        # Normalize path (remove leading/trailing slashes)
        path = inputs.path.strip('/')

        context.logger.info(
            f"Searching for files in bucket '{inputs.bucket_name}' with path '{path}'")

        # List files in S3
        try:
            s3_files = list_s3_objects(s3_client, inputs.bucket_name, path)
            if not s3_files:
                # Try to list the prefix to see if it exists
                try:
                    s3_client.list_objects_v2(
                        Bucket=inputs.bucket_name,
                        Prefix=path,
                        MaxKeys=1
                    )
                    context.logger.error(
                        f"Path '{path}' exists but no files were found")
                except ClientError:
                    context.logger.error(f"Path '{path}' does not exist in bucket")
                raise FileDownloadException(
                    f"No files found at path '{path}' in bucket '{inputs.bucket_name}'")

            context.logger.info(f"Found {len(s3_files)} files to process")

            # Download files from S3 and upload to storage
            uploaded_files: List[FileReference] = []

            for s3_key in s3_files:
                try:
                    # Download file from S3
                    context.logger.info(f"Downloading file: {s3_key} from S3")
                    response = s3_client.get_object(
                        Bucket=inputs.bucket_name, Key=s3_key)
                    file_content = response['Body'].read()

                    # Upload to storage
                    file_name = s3_key.split('/')[-1]
                    context.logger.info(f"Uploading file: {file_name} to storage")
                    file = context.storage.create_file(file_name, file_content)
                    file_ref = context.storage.upload(
                        file, f's3-upload-connector/{context.trace_id}/{file_name}')

                    if file_ref:
                        uploaded_files.append(file_ref)
                        context.logger.info(
                            f"Successfully processed file: {file_name}")
                    else:
                        raise FileProcessingException(
                            f"Failed to upload file {file_name} to storage")

                except ClientError as e:
                    raise FileDownloadException(
                        f"Failed to download file {s3_key}: {str(e)}")
                except Exception as e:
                    raise FileProcessingException(
                        f"Failed to process file {s3_key}: {str(e)}")

            if not uploaded_files:
                raise FileProcessingException(
                    "No files were successfully processed")

            return DownloadFilesOutputs(files=uploaded_files)

        except Exception as e:
            raise InvalidPathException(f"Failed to access path '{path}': {str(e)}")

# 4. File - Models.py

    # Generated by pf-connectors-py-sdk
    # Date: 2025-02-17 06:55:27 UTC
    # Do not edit this file

    from pf_connectors_py_sdk.core.models import FileReference
    from pydantic import BaseModel, Field
    from typing import List

    class AmazonwebserviceCredentials(BaseModel):
        """Pydantic model for AmazonwebserviceCredentials.
        
        Generated automatically from schema definition.
        """
        model_config = {
            "frozen": True,
            "validate_assignment": True,
            "populate_by_name": True,
            "extra": "forbid"
        }

        authentication: str
        IAM_role_name: str
        access_key_id: str
        secret_access_key: str
        region: str

    class DownloadFilesInputs(BaseModel):
        """Pydantic model for DownloadFilesInputs.
        
        Generated automatically from schema definition.
        """
        model_config = {
            "frozen": True,
            "validate_assignment": True,
            "populate_by_name": True,
            "extra": "forbid"
        }

        connection: AmazonwebserviceCredentials
        bucket_name: str
        path: str

    class DownloadFilesOutputs(BaseModel):
        """Pydantic model for DownloadFilesOutputs.
        
        Generated automatically from schema definition.
        """
        model_config = {
            "frozen": True,
            "validate_assignment": True,
            "populate_by_name": True,
            "extra": "forbid"
        }

        files: List[FileReference]  

# Send Email Connector

1.schema.json
2.pyproject.toml
3.app.py
4.models.py

# 1. schema.json

    {
    "id": "connectors.send-email-connector",
    "name": "send-email-connector",
    "version": 1,
    "tags": [
        "connectors",
        "send_gmail"
    ],
    "features": [
        {
        "send_email": {
            "input_schema": "send_email_request_schema",
            "output_schema": "send_email_response_schema",
            "configuration": null,
            "exceptions": [],
            "displayname": "Email Send Message",
            "hidden": false
        }
        }
    ],
    "configuration": {},
    "exceptions": [
        "connectors.send_email_connector.InvalidConfigurationError"
    ],
    "schema": [
        {
        "send_email_request_schema": {
            "entities": [
            {
                "connection": {
                "datatype": "connection",
                "mandatory": true,
                "hidden": false,
                "displayname": "Choose Connection",
                "properties": {
                    "label": "Choose Connection",
                    "options": [],
                    "placeholder_text": "Select Connection",
                    "component": "connectionSelect",
                    "help_text": "Select a predefined Email connection.",
                    "parameter": true,
                    "value_key": "connection",
                    "display_order": 1,
                    "option_label_key": "name",
                    "option_value_key": "connection_id",
                    "provider_name": "SMTP Service"
                }
                }
            },
            {
                "to_addresses": {
                "datatype": "string",
                "mandatory": true,
                "hidden": false,
                "is_array": true,
                "min_count": 1,
                "max_count": -1,
                "displayname": "To (Recipient Email)",
                "properties": {
                    "label": "To (Recipient Email)",
                    "component": "input",
                    "help_text": "Provide the primary recipient email addresses.",
                    "parameter": true,
                    "value_key": "to_addresses",
                    "placeholder": "Provide Recipient Email addresses",
                    "display_order": 2
                }
                }
            },
            {
                "subject": {
                "datatype": "string",
                "mandatory": true,
                "hidden": false,
                "displayname": "Subject",
                "properties": {
                    "label": "Subject",
                    "component": "input",
                    "help_text": "Enter the email subject.",
                    "parameter": true,
                    "value_key": "subject",
                    "placeholder": "Enter the email subject.",
                    "display_order": 3
                }
                }
            },
            {
                "body": {
                "datatype": "string",
                "mandatory": true,
                "hidden": false,
                "displayname": "Email Body",
                "properties": {
                    "label": "Email Body",
                    "component": "input",
                    "help_text": "Compose the email content (supports HTML and plain text).",
                    "parameter": true,
                    "value_key": "body",
                    "placeholder": "Compose the email content (supports HTML and plain text).",
                    "display_order": 4
                }
                }
            },
            {
                "attachments": {
                "displayname": "Attach files from previous workflow steps.",
                "datatype": "file",
                "is_array": true,
                "mandatory": false,
                "parameters": false,
                "properties": {
                    "value_key": "attachments",
                    "parameter": true,
                    "display_order": 5,
                    "label": "Attachments",
                    "help_text": "Attach files from previous workflow steps."
                },
                "values": [
                    "png",
                    "jpg",
                    "jpeg",
                    "pdf",
                    "tif",
                    "tiff"
                ],
                "entities": {
                    "file_ref_id": {
                    "datatype": "string",
                    "mandatory": true,
                    "hidden": false
                    },
                    "mime_type": {
                    "datatype": "string",
                    "mandatory": false,
                    "hidden": false
                    },
                    "file_name": {
                    "datatype": "string",
                    "mandatory": true,
                    "hidden": false
                    },
                    "file_id": {
                    "datatype": "string",
                    "mandatory": false,
                    "hidden": false
                    }
                }
                }
            }
            ]
        }
        },
        {
        "send_email_response_schema": {
            "entities": [
            {
                "result": {
                "datatype": "string",
                "displayname": "Result",
                "mandatory": true,
                "hidden": false
                }
            }
            ]
        }
        }
    ],
    "deployment": {
        "type": "container",
        "shared": true,
        "properties": {
        "nodeAffinity": "",
        "autoScale": true
        },
        "workers": 2,
        "resources": {
        "cpu": 2,
        "mem": 2
        }
    }
    }                                           

# 2. Pyproject.toml

    [project]
    name = "send-email-connector"
    version = "0.0.1"
    description = "Project built using the Purple Fabric Connectors Python SDK for integration with the Purple Fabric Platform."
    readme = "README.md"
    requires-python = ">=3.12"
    dependencies = ["pf-connectors-py-sdk==0.1.111", ]

    [tool.pf-connectors-py-sdk]
    asset_name = "Send Email"
    asset_description = "Send emails with support for attachments."
    asset_category = "INTEGRATOR"

# 3. app.py

    import smtplib
    from email.mime.application import MIMEApplication
    from email.mime.multipart import MIMEMultipart
    from email.mime.text import MIMEText

    from generated.models import SmtpserviceCredentials, SendEmailInputs, SendEmailOutputs
    from pf_connectors_py_sdk import Context
    from pf_connectors_py_sdk.core.exceptions import AssetException


    class SendEmailException(AssetException):
        def __init__(self, message):
            super().__init__(message)


    def run_send_email(inputs: SendEmailInputs, context: Context) -> SendEmailOutputs:
        # Create message container
        msg = MIMEMultipart()
        msg['Subject'] = inputs.subject
        msg['From'] = inputs.connection.email_id
        msg['To'] = ', '.join(inputs.to_addresses)

        # Add body
        msg.attach(MIMEText(inputs.body, 'html'))

        # Add attachments if any
        for attachment in inputs.attachments:
            if attachment.file_ref_id and attachment.file_name:
                # Download file from storage using file_ref_id
                file_content = context.storage.download(file_ref_id=attachment.file_ref_id)

                # Create MIME attachment part directly from the downloaded content
                part = MIMEApplication(file_content.content, _subtype=attachment.mime_type)
                part.add_header('Content-Disposition', 'attachment', filename=attachment.file_name)
                msg.attach(part)

        try:
            # Connect to SMTP server
            server = smtplib.SMTP(inputs.connection.server_url, inputs.connection.server_port)
            server.starttls()

            # Login
            server.login(inputs.connection.email_id, inputs.connection.password)

            # Send email
            server.send_message(msg)
            server.quit()

            return SendEmailOutputs(result="Email sent successfully")
        except Exception as e:
            raise SendEmailException(f"Failed to send email: {str(e)}")

# 4. models.py

    # Generated by pf-connectors-py-sdk
    # Date: 2025-03-20 09:56:25 UTC
    # Do not edit this file

    from typing import List

    from pf_connectors_py_sdk.core.models import FileReference
    from pydantic import BaseModel


    class SmtpserviceCredentials(BaseModel):
        """Pydantic model for SmtpserviceCredentials.

        Generated automatically from schema definition.
        """

        model_config = {
            "frozen": True,
            "validate_assignment": True,
            "populate_by_name": True,
            "extra": "forbid",
        }

        password: str
        server_url: str
        server_port: int
        email_provider: str
        email_id: str


    class SendEmailInputs(BaseModel):
        """Pydantic model for SendEmailInputs.

        Generated automatically from schema definition.
        """

        model_config = {
            "frozen": True,
            "validate_assignment": True,
            "populate_by_name": True,
            "extra": "forbid",
        }

        connection: SmtpserviceCredentials
        to_addresses: List[str]
        subject: str
        body: str
        attachments: List[FileReference]


    class SendEmailOutputs(BaseModel):
        """Pydantic model for SendEmailOutputs.

        Generated automatically from schema definition.
        """

        model_config = {
            "frozen": True,
            "validate_assignment": True,
            "populate_by_name": True,
            "extra": "forbid",
        }

        result: str