#
Examples of Production ready Connectors
#
S3 Download Connector
1.
2.
3.
4.
#
1. File - schema.json
{
"id": "connectors.s3_download",
"name": "S3 Download",
"version": 1,
"tags": [
"connectors",
"storage",
"download_files"
],
"features": [
{
"download_files": {
"input_schema": "download_files_request_schema",
"output_schema": "download_files_response_schema",
"configuration": null,
"exceptions": [
"connectors.s3_connector.S3FileDownloadError"
],
"displayname": "Read from S3",
"hidden": false
}
}
],
"configuration": {},
"exceptions": [
"connectors.s3_connector.InvalidConfigurationError",
"connectors.s3_connector.ConfigurationLoadError"
],
"schema": [
{
"download_files_request_schema": {
"entities": [
{
"connection": {
"datatype": "connection",
"mandatory": true,
"hidden": false,
"displayname": "Connection",
"properties": {
"label": "Connection",
"options": [],
"placeholder_text": "Select Connection",
"component": "connectionSelect",
"help_text": "Choose an S3 connection from the available list.",
"parameter": true,
"value_key": "connection",
"display_order": 1,
"option_label_key": "name",
"option_value_key": "connection_id",
"provider_name": "Amazon Web Service"
}
}
},
{
"bucket_name": {
"datatype": "string",
"mandatory": true,
"hidden": false,
"displayname": "Bucket Name",
"properties": {
"label": "Bucket Name",
"component": "input",
"help_text": "Enter the name of the S3 bucket to download files from.",
"parameter": true,
"value_key": "bucket_name",
"placeholder": "Provide Bucket Name",
"display_order": 2
}
}
},
{
"path": {
"datatype": "string",
"mandatory": true,
"hidden": false,
"displayname": "Path",
"properties": {
"label": "Path",
"component": "input",
"help_text": "Specify the path within the bucket to locate the files.",
"parameter": true,
"value_key": "path",
"placeholder": "Provide Path",
"display_order": 3
}
}
}
]
}
},
{
"download_files_response_schema": {
"entities": [
{
"files": {
"datatype": "file",
"displayname": "Files",
"mandatory": true,
"hidden": false,
"values": [
"png",
"jpg",
"jpeg",
"pdf",
"tif",
"tiff"
],
"is_array": true,
"min_count": 1,
"max_count": -1,
"entities": {
"file_name": {
"datatype": "string",
"mandatory": true,
"hidden": false
},
"file_ref_id": {
"datatype": "string",
"mandatory": true,
"hidden": false
},
"file_id": {
"datatype": "string",
"mandatory": false,
"hidden": false
},
"mime_type": {
"datatype": "string",
"mandatory": false,
"hidden": false
},
"total_pages": {
"datatype": "number",
"mandatory": true,
"hidden": false
}
}
}
}
]
}
}
],
"deployment": {
"type": "container",
"shared": true,
"properties": {
"nodeAffinity": "",
"autoScale": true
},
"workers": 2,
"resources": {
"cpu": 2,
"mem": 2
}
}
}
#
2. File - pyproject.toml
[project]
name = "read-from-s3-connector"
version = "0.0.1"
description = "Project built using the Purple Fabric Connectors Python SDK for integration with the Purple Fabric Platform."
readme = "README.md"
requires-python = ">=3.12"
dependencies = [ "pf-connectors-py-sdk==0.1.111", "boto3",]
[tool.pf-connectors-py-sdk]
asset_name = "S3 Download"
asset_category = "INTEGRATOR"
asset_description = "Retrieve files from an Amazon S3 bucket."
#
3. File - app.py
from typing import List
import os
import boto3
from botocore.exceptions import ClientError
from generated.models import DownloadFilesInputs, DownloadFilesOutputs
from pf_connectors_py_sdk import Context
from pf_connectors_py_sdk.core.models import FileReference
from exceptions import (
AWSAccessException,
InvalidBucketNameException,
InvalidPathException,
FileDownloadException,
FileProcessingException,
InvalidAuthenticationException
)
def list_s3_objects(s3_client, bucket: str, prefix: str, recursive: bool = False) -> List[str]:
"""List objects in S3 bucket with given prefix. Can handle both file and directory paths."""
try:
# Make a single list_objects_v2 call with appropriate parameters
params = {
'Bucket': bucket,
# Remove trailing slash to handle both file and directory cases
'Prefix': prefix.rstrip('/'),
}
# First try without delimiter to check if there are any files
response = s3_client.list_objects_v2(**params)
# If no files found with direct prefix, try with trailing slash
if 'Contents' not in response:
params['Prefix'] = f"{prefix.rstrip('/')}/"
response = s3_client.list_objects_v2(**params)
# If still no files and not recursive, try with delimiter
if not recursive and 'Contents' not in response:
params['Delimiter'] = '/'
response = s3_client.list_objects_v2(**params)
files = []
# Process all contents (files) in the response
for obj in response.get('Contents', []):
key = obj['Key']
if not key.endswith('/'): # Skip directory markers
files.append(key)
# If we're not in recursive mode and found no direct files,
# but have CommonPrefixes, switch to recursive mode for this case
if not recursive and not files and 'CommonPrefixes' in response:
# Try one more time with recursive listing
return list_s3_objects(s3_client, bucket, prefix, recursive=True)
return sorted(files) # Return sorted list for consistent results
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucket':
raise InvalidBucketNameException(
f"Bucket '{bucket}' does not exist")
raise AWSAccessException(f"Failed to list objects: {str(e)}")
def run_download_files(inputs: DownloadFilesInputs, context: Context) -> DownloadFilesOutputs:
# Validate authentication configuration
if not inputs.connection.authentication:
raise InvalidAuthenticationException(
"Authentication type must be specified in the connection configuration")
# Initialize S3 client with appropriate authentication
try:
if inputs.connection.authentication == "IAM ROLE":
# Use IAM role-based authentication
if not inputs.connection.IAM_role_name:
raise InvalidAuthenticationException(
"IAM role name must be provided for IAM Role authentication")
# First, assume the outbound role in the current account
outbound_role_arn = os.environ.get('AWS_OUTBOUND_IAM_ROLE')
if not outbound_role_arn:
raise InvalidAuthenticationException(
"AWS_OUTBOUND_IAM_ROLE environment variable must be set for cross-account access")
sts_client = boto3.client('sts')
# Step 1: Assume the outbound role
outbound_role = sts_client.assume_role(
RoleArn=outbound_role_arn,
RoleSessionName="S3ConnectorOutboundSession"
)
# Create STS client with outbound role credentials
sts_client_outbound = boto3.client(
'sts',
aws_access_key_id=outbound_role['Credentials']['AccessKeyId'],
aws_secret_access_key=outbound_role['Credentials']['SecretAccessKey'],
aws_session_token=outbound_role['Credentials']['SessionToken'],
region_name=inputs.connection.region
)
# Step 2: Use the outbound role to assume the destination role
destination_role = sts_client_outbound.assume_role(
RoleArn=inputs.connection.IAM_role_name,
RoleSessionName="S3ConnectorDestinationSession"
)
# Get final credentials from destination role
credentials = destination_role['Credentials']
s3_client = boto3.client(
's3',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'],
region_name=inputs.connection.region
)
elif inputs.connection.authentication == "Secrets":
# Use access key based authentication
if not inputs.connection.access_key_id or not inputs.connection.secret_access_key:
raise InvalidAuthenticationException(
"Access key ID and secret access key must be provided for Secrets authentication")
s3_client = boto3.client(
's3',
aws_access_key_id=inputs.connection.access_key_id,
aws_secret_access_key=inputs.connection.secret_access_key,
region_name=inputs.connection.region
)
else:
raise InvalidAuthenticationException(
f"Invalid authentication type: {inputs.connection.authentication}. Must be either 'IAM Role' or 'Secrets'")
except ClientError as e:
raise AWSAccessException(f"Failed to initialize S3 client: {str(e)}")
except Exception as e:
raise AWSAccessException(f"Failed to initialize S3 client: {str(e)}")
# Normalize path (remove leading/trailing slashes)
path = inputs.path.strip('/')
context.logger.info(
f"Searching for files in bucket '{inputs.bucket_name}' with path '{path}'")
# List files in S3
try:
s3_files = list_s3_objects(s3_client, inputs.bucket_name, path)
if not s3_files:
# Try to list the prefix to see if it exists
try:
s3_client.list_objects_v2(
Bucket=inputs.bucket_name,
Prefix=path,
MaxKeys=1
)
context.logger.error(
f"Path '{path}' exists but no files were found")
except ClientError:
context.logger.error(f"Path '{path}' does not exist in bucket")
raise FileDownloadException(
f"No files found at path '{path}' in bucket '{inputs.bucket_name}'")
context.logger.info(f"Found {len(s3_files)} files to process")
# Download files from S3 and upload to storage
uploaded_files: List[FileReference] = []
for s3_key in s3_files:
try:
# Download file from S3
context.logger.info(f"Downloading file: {s3_key} from S3")
response = s3_client.get_object(
Bucket=inputs.bucket_name, Key=s3_key)
file_content = response['Body'].read()
# Upload to storage
file_name = s3_key.split('/')[-1]
context.logger.info(f"Uploading file: {file_name} to storage")
file = context.storage.create_file(file_name, file_content)
file_ref = context.storage.upload(
file, f's3-upload-connector/{context.trace_id}/{file_name}')
if file_ref:
uploaded_files.append(file_ref)
context.logger.info(
f"Successfully processed file: {file_name}")
else:
raise FileProcessingException(
f"Failed to upload file {file_name} to storage")
except ClientError as e:
raise FileDownloadException(
f"Failed to download file {s3_key}: {str(e)}")
except Exception as e:
raise FileProcessingException(
f"Failed to process file {s3_key}: {str(e)}")
if not uploaded_files:
raise FileProcessingException(
"No files were successfully processed")
return DownloadFilesOutputs(files=uploaded_files)
except Exception as e:
raise InvalidPathException(f"Failed to access path '{path}': {str(e)}")
#
4. File - Models.py
# Generated by pf-connectors-py-sdk
# Date: 2025-02-17 06:55:27 UTC
# Do not edit this file
from pf_connectors_py_sdk.core.models import FileReference
from pydantic import BaseModel, Field
from typing import List
class AmazonwebserviceCredentials(BaseModel):
"""Pydantic model for AmazonwebserviceCredentials.
Generated automatically from schema definition.
"""
model_config = {
"frozen": True,
"validate_assignment": True,
"populate_by_name": True,
"extra": "forbid"
}
authentication: str
IAM_role_name: str
access_key_id: str
secret_access_key: str
region: str
class DownloadFilesInputs(BaseModel):
"""Pydantic model for DownloadFilesInputs.
Generated automatically from schema definition.
"""
model_config = {
"frozen": True,
"validate_assignment": True,
"populate_by_name": True,
"extra": "forbid"
}
connection: AmazonwebserviceCredentials
bucket_name: str
path: str
class DownloadFilesOutputs(BaseModel):
"""Pydantic model for DownloadFilesOutputs.
Generated automatically from schema definition.
"""
model_config = {
"frozen": True,
"validate_assignment": True,
"populate_by_name": True,
"extra": "forbid"
}
files: List[FileReference]
#
Send Email Connector
1.
2.
3.
4.
#
1. schema.json
{
"id": "connectors.send-email-connector",
"name": "send-email-connector",
"version": 1,
"tags": [
"connectors",
"send_gmail"
],
"features": [
{
"send_email": {
"input_schema": "send_email_request_schema",
"output_schema": "send_email_response_schema",
"configuration": null,
"exceptions": [],
"displayname": "Email Send Message",
"hidden": false
}
}
],
"configuration": {},
"exceptions": [
"connectors.send_email_connector.InvalidConfigurationError"
],
"schema": [
{
"send_email_request_schema": {
"entities": [
{
"connection": {
"datatype": "connection",
"mandatory": true,
"hidden": false,
"displayname": "Choose Connection",
"properties": {
"label": "Choose Connection",
"options": [],
"placeholder_text": "Select Connection",
"component": "connectionSelect",
"help_text": "Select a predefined Email connection.",
"parameter": true,
"value_key": "connection",
"display_order": 1,
"option_label_key": "name",
"option_value_key": "connection_id",
"provider_name": "SMTP Service"
}
}
},
{
"to_addresses": {
"datatype": "string",
"mandatory": true,
"hidden": false,
"is_array": true,
"min_count": 1,
"max_count": -1,
"displayname": "To (Recipient Email)",
"properties": {
"label": "To (Recipient Email)",
"component": "input",
"help_text": "Provide the primary recipient email addresses.",
"parameter": true,
"value_key": "to_addresses",
"placeholder": "Provide Recipient Email addresses",
"display_order": 2
}
}
},
{
"subject": {
"datatype": "string",
"mandatory": true,
"hidden": false,
"displayname": "Subject",
"properties": {
"label": "Subject",
"component": "input",
"help_text": "Enter the email subject.",
"parameter": true,
"value_key": "subject",
"placeholder": "Enter the email subject.",
"display_order": 3
}
}
},
{
"body": {
"datatype": "string",
"mandatory": true,
"hidden": false,
"displayname": "Email Body",
"properties": {
"label": "Email Body",
"component": "input",
"help_text": "Compose the email content (supports HTML and plain text).",
"parameter": true,
"value_key": "body",
"placeholder": "Compose the email content (supports HTML and plain text).",
"display_order": 4
}
}
},
{
"attachments": {
"displayname": "Attach files from previous workflow steps.",
"datatype": "file",
"is_array": true,
"mandatory": false,
"parameters": false,
"properties": {
"value_key": "attachments",
"parameter": true,
"display_order": 5,
"label": "Attachments",
"help_text": "Attach files from previous workflow steps."
},
"values": [
"png",
"jpg",
"jpeg",
"pdf",
"tif",
"tiff"
],
"entities": {
"file_ref_id": {
"datatype": "string",
"mandatory": true,
"hidden": false
},
"mime_type": {
"datatype": "string",
"mandatory": false,
"hidden": false
},
"file_name": {
"datatype": "string",
"mandatory": true,
"hidden": false
},
"file_id": {
"datatype": "string",
"mandatory": false,
"hidden": false
}
}
}
}
]
}
},
{
"send_email_response_schema": {
"entities": [
{
"result": {
"datatype": "string",
"displayname": "Result",
"mandatory": true,
"hidden": false
}
}
]
}
}
],
"deployment": {
"type": "container",
"shared": true,
"properties": {
"nodeAffinity": "",
"autoScale": true
},
"workers": 2,
"resources": {
"cpu": 2,
"mem": 2
}
}
}
#
2. Pyproject.toml
[project]
name = "send-email-connector"
version = "0.0.1"
description = "Project built using the Purple Fabric Connectors Python SDK for integration with the Purple Fabric Platform."
readme = "README.md"
requires-python = ">=3.12"
dependencies = ["pf-connectors-py-sdk==0.1.111", ]
[tool.pf-connectors-py-sdk]
asset_name = "Send Email"
asset_description = "Send emails with support for attachments."
asset_category = "INTEGRATOR"
#
3. app.py
import smtplib
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from generated.models import SmtpserviceCredentials, SendEmailInputs, SendEmailOutputs
from pf_connectors_py_sdk import Context
from pf_connectors_py_sdk.core.exceptions import AssetException
class SendEmailException(AssetException):
def __init__(self, message):
super().__init__(message)
def run_send_email(inputs: SendEmailInputs, context: Context) -> SendEmailOutputs:
# Create message container
msg = MIMEMultipart()
msg['Subject'] = inputs.subject
msg['From'] = inputs.connection.email_id
msg['To'] = ', '.join(inputs.to_addresses)
# Add body
msg.attach(MIMEText(inputs.body, 'html'))
# Add attachments if any
for attachment in inputs.attachments:
if attachment.file_ref_id and attachment.file_name:
# Download file from storage using file_ref_id
file_content = context.storage.download(file_ref_id=attachment.file_ref_id)
# Create MIME attachment part directly from the downloaded content
part = MIMEApplication(file_content.content, _subtype=attachment.mime_type)
part.add_header('Content-Disposition', 'attachment', filename=attachment.file_name)
msg.attach(part)
try:
# Connect to SMTP server
server = smtplib.SMTP(inputs.connection.server_url, inputs.connection.server_port)
server.starttls()
# Login
server.login(inputs.connection.email_id, inputs.connection.password)
# Send email
server.send_message(msg)
server.quit()
return SendEmailOutputs(result="Email sent successfully")
except Exception as e:
raise SendEmailException(f"Failed to send email: {str(e)}")
#
4. models.py
# Generated by pf-connectors-py-sdk
# Date: 2025-03-20 09:56:25 UTC
# Do not edit this file
from typing import List
from pf_connectors_py_sdk.core.models import FileReference
from pydantic import BaseModel
class SmtpserviceCredentials(BaseModel):
"""Pydantic model for SmtpserviceCredentials.
Generated automatically from schema definition.
"""
model_config = {
"frozen": True,
"validate_assignment": True,
"populate_by_name": True,
"extra": "forbid",
}
password: str
server_url: str
server_port: int
email_provider: str
email_id: str
class SendEmailInputs(BaseModel):
"""Pydantic model for SendEmailInputs.
Generated automatically from schema definition.
"""
model_config = {
"frozen": True,
"validate_assignment": True,
"populate_by_name": True,
"extra": "forbid",
}
connection: SmtpserviceCredentials
to_addresses: List[str]
subject: str
body: str
attachments: List[FileReference]
class SendEmailOutputs(BaseModel):
"""Pydantic model for SendEmailOutputs.
Generated automatically from schema definition.
"""
model_config = {
"frozen": True,
"validate_assignment": True,
"populate_by_name": True,
"extra": "forbid",
}
result: str