Skip to content

Use AWS SDK To Integrate AWS IAM

Connecting to an ElastiCache or MemoryDB cluster configured with IAM Authentication using GLIDE, requires:

  1. Generating an IAM Authentication token using AWS SDK.
  2. Initializing the GLIDE client by passing the IAM token as part of the credentials configuration.
  3. Refreshing the token periodically with GLIDE’s Dynamic Password Update Feature.

This guide shows how to utilize the AWS SDK for the IAM token generation. Please refer to the AWS SDK docs for a detailed explanation regarding generating the IAM token.

from typing import Tuple, Union
from urllib.parse import ParseResult, urlencode, urlunparse
import botocore.session
from botocore.model import ServiceId
from botocore.signers import RequestSigner
from cachetools import TTLCache, cached
import valkey
class ElastiCacheIAMProvider(valkey.CredentialProvider):
def __init__(self, user, cluster_name, region="us-east-1"):
self.user = user
self.cluster_name = cluster_name
self.region = region
session = botocore.session.get_session()
self.request_signer = RequestSigner(
ServiceId("elasticache"),
self.region,
"elasticache",
"v4",
session.get_credentials(),
session.get_component("event_emitter"),
)
# Generated IAM tokens are valid for 15 minutes
@cached(cache=TTLCache(maxsize=128, ttl=900))
def get_credentials(self) -> Tuple[str, str]:
query_params = {"Action": "connect", "User": self.user}
url = urlunparse(
ParseResult(
scheme="https",
netloc=self.cluster_name,
path="/",
query=urlencode(query_params),
params="",
fragment="",
)
)
signed_url = self.request_signer.generate_presigned_url(
{"method": "GET", "url": url, "body": {}, "headers": {}, "context": {}},
operation_name="connect",
expires_in=900,
region_name=self.region,
)
# Elasticache expects to receive the URL without the protocol prefix
return (self.user, signed_url.removeprefix("https://"))
from typing import Tuple, Union
import asyncio
from glide import (
GlideClusterClient,
GlideClusterClientConfiguration,
ServerCredentials,
NodeAddress,
)
async def main():
username = "your-username"
cluster_name = "your-cluster-name"
auth = ElastiCacheIAMProvider(user=username,cluster_name=cluster_name, region='us-east-1')
_, iam_token = auth.get_credentials()
valkey_credentials = ServerCredentials(
username=username,
password=iam_token,
)
addresses = [NodeAddress("example-cluster-endpoint.use1.cache.amazonaws.com", 6379)]
config = GlideClusterClientConfiguration(addresses=addresses, use_tls=True, credentials=valkey_credentials)
client = await GlideClusterClient.create(config)
# Update password dynamically
_, new_iam_token = auth.get_credentials()
await client.update_connection_password(new_iam_token)
# To perform immediate re-authentication, set the second parameter to true
await client.update_connection_password(new_iam_token, True)

Legacy: IAM Authentication with AWS SDK for Java v1

Section titled “Legacy: IAM Authentication with AWS SDK for Java v1”
Click to expand
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.http.HttpMethodName;
import com.amazonaws.Request;
import com.amazonaws.DefaultRequest;
import com.amazonaws.auth.AWS4Signer;
import org.apache.http.NameValuePair;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.client.utils.URIBuilder;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class ElastiCacheIAMProvider {
private static final HttpMethodName REQUEST_METHOD = HttpMethodName.GET;
private static final String REQUEST_PROTOCOL = "http://";
private static final String PARAM_ACTION = "Action";
private static final String PARAM_USER = "User";
private static final String PARAM_RESOURCE_TYPE = "ResourceType";
private static final String RESOURCE_TYPE_SERVERLESS_CACHE = "ServerlessCache";
private static final String ACTION_NAME = "connect";
private static final String SERVICE_NAME = "elasticache";
private static final long TOKEN_EXPIRY_SECONDS = 900;
private final String userId;
private final String cacheName;
private final String region;
private final boolean isServerless;
public ElastiCacheIAMProvider(String userId, String cacheName, String region, boolean isServerless) {
this.userId = userId;
this.cacheName = cacheName;
this.region = region;
this.isServerless = isServerless;
}
public String toSignedRequestUri() throws URISyntaxException {
AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
AWSCredentials credentials = credentialsProvider.getCredentials();
Request<Void> request = getSignableRequest();
sign(request, credentials);
return new URIBuilder(request.getEndpoint())
.addParameters(toNamedValuePair(request.getParameters()))
.build()
.toString()
.replace(REQUEST_PROTOCOL, "");
}
private <T> Request<T> getSignableRequest() {
Request<T> request = new DefaultRequest<>(SERVICE_NAME);
request.setHttpMethod(REQUEST_METHOD);
request.setEndpoint(getRequestUri());
request.addParameter(PARAM_ACTION, ACTION_NAME);
request.addParameter(PARAM_USER, userId);
if (isServerless) {
request.addParameter(PARAM_RESOURCE_TYPE, RESOURCE_TYPE_SERVERLESS_CACHE);
}
return request;
}
private URI getRequestUri() {
return URI.create(String.format("%s%s/", REQUEST_PROTOCOL, cacheName));
}
private <T> void sign(Request<T> request, AWSCredentials credentials) {
AWS4Signer signer = new AWS4Signer();
signer.setRegionName(region);
signer.setServiceName(SERVICE_NAME);
DateTime dateTime = DateTime.now();
dateTime = dateTime.plus(Duration.standardSeconds(TOKEN_EXPIRY_SECONDS));
signer.presignRequest(request, credentials, dateTime.toDate());
}
private static List<NameValuePair> toNamedValuePair(Map<String, List<String>> in) {
return in.entrySet().stream()
.map(e -> new BasicNameValuePair(e.getKey(), e.getValue().get(0)))
.collect(Collectors.toList());
}
}
import glide.api.GlideClusterClient;
import glide.api.models.configuration.NodeAddress;
import glide.api.models.configuration.GlideClusterClientConfiguration;
import glide.api.models.configuration.ServerCredentials;
import static glide.api.models.GlideString.gs;
import java.util.concurrent.ExecutionException;
public class Main {
public static void main(String[] args) {
runGlideExamples();
}
private static void runGlideExamples() {
try {
String host = "example-cluster-endpoint.use1.cache.amazonaws.com";
String cacheName = "your-cluster-name";
String username = "your-username";
String region = "us-east-1";
Integer port1 = 6379;
boolean useSsl = true;
boolean isServerless = true;
ElastiCacheIAMProvider iamAuthTokenProvider = new ElastiCacheIAMProvider(username, cacheName, region, isServerless);
String iamAuthToken = iamAuthTokenProvider.toSignedRequestUri();
System.out.println(iamAuthToken);
ServerCredentials my_credentials = ServerCredentials.builder()
.username(username)
.password(iamAuthToken)
.build();
GlideClusterClientConfiguration config =
GlideClusterClientConfiguration.builder()
.address(NodeAddress.builder()
.host(host)
.port(port1)
.build())
.useTLS(useSsl)
.credentials(my_credentials)
.build();
try {
GlideClusterClient client = GlideClusterClient.createClient(config).get();
System.out.println("PING: " + client.ping(gs("PING")).get());
// Update password dynamically
String newIAMAuthToken = iamAuthTokenProvider.toSignedRequestUri();
client.updateConnectionPassword(newIAMAuthToken, false);
System.out.println("PING: " + client.ping(gs("PING")).get());
// To perform immediate re-authentication, set the second parameter to true
client.updateConnectionPassword(newIAMAuthToken, true);
System.out.println("PING: " + client.ping(gs("PING")).get());
} catch (ExecutionException | InterruptedException e) {
System.out.println("Glide example failed with an exception: ");
e.printStackTrace();
}
} catch (java.net.URISyntaxException e) {
System.out.println("URI Syntax error: " + e.getMessage());
e.printStackTrace();
}
}
}

GLIDE supports dynamic updates to the password in the connection configuration at runtime (see Dynamic Password Management for more details). This capability is required when performing IAM authentication with AWS SDK because:

  • IAM tokens are only valid for 15 minutes.
  • Connections authenticated with IAM must be re-authenticated every 12 hours using the AUTH or HELLO command with a fresh IAM token. Otherwise, the connection will be terminated.

To efficiently manage this, GLIDE provides the ability to update short-lived tokens using updateConnectionPassword (or update_connection_password) to periodically refresh the token without immediately re-authenticating. It also offers the option for immediate re-authentication using immediateAuth (or immediate_auth), which forces re-authentication and extends the connection for another 12 hours.