Ansluta till Apache Kafka på HDInsight via Azure Virtual Network
Lär dig hur du ansluter direkt till Apache Kafka i HDInsight via ett virtuellt Azure-nätverk. Det här dokumentet innehåller information om hur du ansluter till Kafka med hjälp av följande konfigurationer:
- Från resurser i ett lokalt nätverk. Den här anslutningen upprättas med hjälp av en VPN-enhet (programvara eller maskinvara) i ditt lokala nätverk.
- Från en utvecklingsmiljö med hjälp av en VPN-programvaruklient.
Kommentar
Vi rekommenderar att du använder Azure Az PowerShell-modulen för att interagera med Azure. Se Installera Azure PowerShell för att komma igång. Information om hur du migrerar till Az PowerShell-modulen finns i artikeln om att migrera Azure PowerShell från AzureRM till Az.
Arkitektur och planering
HDInsight tillåter inte direkt anslutning till Kafka via det offentliga Internet. Kafka-klienter (producenter och konsumenter) måste i stället använda någon av följande anslutningsmetoder:
Kör klienten i samma virtuella nätverk som Kafka på HDInsight. Den här konfigurationen används i dokumentet Starta med Apache Kafka i HDInsight . Klienten körs direkt på HDInsight-klusternoderna eller på en annan virtuell dator i samma nätverk.
Anslut ett privat nätverk, till exempel ditt lokala nätverk, till det virtuella nätverket. Med den här konfigurationen kan klienter i ditt lokala nätverk arbeta direkt med Kafka. Utför följande uppgifter för att aktivera den här konfigurationen:
Skapa ett virtuellt nätverk.
Skapa en VPN-gateway som använder en plats-till-plats-konfiguration. Konfigurationen som används i det här dokumentet ansluter till en VPN-gatewayenhet i ditt lokala nätverk.
Skapa en DNS-server i det virtuella nätverket.
Konfigurera vidarebefordran mellan DNS-servern i varje nätverk.
Skapa en Kafka på HDInsight-kluster i det virtuella nätverket.
Mer information finns i avsnittet Anslut till Apache Kafka från ett lokalt nätverk .
Ansluta enskilda datorer till det virtuella nätverket med hjälp av en VPN-gateway och VPN-klient. Utför följande uppgifter för att aktivera den här konfigurationen:
Skapa ett virtuellt nätverk.
Skapa en VPN-gateway som använder en punkt-till-plats-konfiguration. Den här konfigurationen kan användas med både Windows- och macOS-klienter.
Skapa en Kafka på HDInsight-kluster i det virtuella nätverket.
Konfigurera Kafka för IP-reklam. Med den här konfigurationen kan klienten ansluta med hjälp av asynkrona IP-adresser i stället för domännamn.
Ladda ned och använd VPN-klienten i utvecklingssystemet.
Mer information finns i avsnittet Anslut till Apache Kafka med en VPN-klient .
Varning
Den här konfigurationen rekommenderas endast i utvecklingssyfte på grund av följande begränsningar:
- Varje klient måste ansluta med en VPN-programvaruklient.
- VPN-klienten skickar inte namnmatchningsbegäranden till det virtuella nätverket, så du måste använda IP-adresser för att kommunicera med Kafka. IP-kommunikation kräver ytterligare konfiguration i Kafka-klustret.
Mer information om hur du använder HDInsight i ett virtuellt nätverk finns i Planera ett virtuellt nätverk för Azure HDInsight-kluster.
Ansluta till Apache Kafka från ett lokalt nätverk
Om du vill skapa ett Kafka-kluster som kommunicerar med ditt lokala nätverk följer du stegen i dokumentet Anslut HDInsight till ditt lokala nätverk .
Viktigt!
När du skapar HDInsight-klustret väljer du Kafka-klustertypen .
De här stegen skapar följande konfiguration:
- Azure Virtual Network
- Plats-till-plats-VPN-gateway
- Azure Storage-konto (används av HDInsight)
- Kafka på HDInsight
Om du vill kontrollera att en Kafka-klient kan ansluta till klustret lokalt använder du stegen i avsnittet Exempel: Python-klient .
Ansluta till Apache Kafka med en VPN-klient
Använd stegen i det här avsnittet för att skapa följande konfiguration:
- Azure Virtual Network
- Punkt-till-plats-VPN-gateway
- Azure Storage-konto (används av HDInsight)
- Kafka på HDInsight
Följ stegen i dokumentet Arbeta med självsignerade certifikat för punkt-till-plats-anslutningar . Det här dokumentet skapar de certifikat som behövs för gatewayen.
Öppna en PowerShell-prompt och använd följande kod för att logga in på din Azure-prenumeration:
Connect-AzAccount # If you have multiple subscriptions, uncomment to set the subscription #Select-AzSubscription -SubscriptionName "name of your subscription"
Använd följande kod för att skapa variabler som innehåller konfigurationsinformation:
# Prompt for generic information $resourceGroupName = Read-Host "What is the resource group name?" $baseName = Read-Host "What is the base name? It is used to create names for resources, such as 'net-basename' and 'kafka-basename':" $location = Read-Host "What Azure Region do you want to create the resources in?" $rootCert = Read-Host "What is the file path to the root certificate? It is used to secure the VPN gateway." # Prompt for HDInsight credentials $adminCreds = Get-Credential -Message "Enter the HTTPS user name and password for the HDInsight cluster" -UserName "admin" $sshCreds = Get-Credential -Message "Enter the SSH user name and password for the HDInsight cluster" -UserName "sshuser" # Names for Azure resources $networkName = "net-$baseName" $clusterName = "kafka-$baseName" $storageName = "store$baseName" # Can't use dashes in storage names $defaultContainerName = $clusterName $defaultSubnetName = "default" $gatewaySubnetName = "GatewaySubnet" $gatewayPublicIpName = "GatewayIp" $gatewayIpConfigName = "GatewayConfig" $vpnRootCertName = "rootcert" $vpnName = "VPNGateway" # Network settings $networkAddressPrefix = "10.0.0.0/16" $defaultSubnetPrefix = "10.0.0.0/24" $gatewaySubnetPrefix = "10.0.1.0/24" $vpnClientAddressPool = "172.16.201.0/24" # HDInsight settings $hdiWorkerNodes = 4 $hdiVersion = "3.6" $hdiType = "Kafka"
Använd följande kod för att skapa Azure-resursgruppen och det virtuella nätverket:
# Create the resource group that contains everything New-AzResourceGroup -Name $resourceGroupName -Location $location # Create the subnet configuration $defaultSubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName ` -AddressPrefix $defaultSubnetPrefix $gatewaySubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName ` -AddressPrefix $gatewaySubnetPrefix # Create the subnet New-AzVirtualNetwork -Name $networkName ` -ResourceGroupName $resourceGroupName ` -Location $location ` -AddressPrefix $networkAddressPrefix ` -Subnet $defaultSubnetConfig, $gatewaySubnetConfig # Get the network & subnet that were created $network = Get-AzVirtualNetwork -Name $networkName ` -ResourceGroupName $resourceGroupName $gatewaySubnet = Get-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName ` -VirtualNetwork $network $defaultSubnet = Get-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName ` -VirtualNetwork $network # Set a dynamic public IP address for the gateway subnet $gatewayPublicIp = New-AzPublicIpAddress -Name $gatewayPublicIpName ` -ResourceGroupName $resourceGroupName ` -Location $location ` -AllocationMethod Dynamic $gatewayIpConfig = New-AzVirtualNetworkGatewayIpConfig -Name $gatewayIpConfigName ` -Subnet $gatewaySubnet ` -PublicIpAddress $gatewayPublicIp # Get the certificate info # Get the full path in case a relative path was passed $rootCertFile = Get-ChildItem $rootCert $cert = New-Object System.Security.Cryptography.X509Certificates.X509Certificate2($rootCertFile) $certBase64 = [System.Convert]::ToBase64String($cert.RawData) $p2sRootCert = New-AzVpnClientRootCertificate -Name $vpnRootCertName ` -PublicCertData $certBase64 # Create the VPN gateway New-AzVirtualNetworkGateway -Name $vpnName ` -ResourceGroupName $resourceGroupName ` -Location $location ` -IpConfigurations $gatewayIpConfig ` -GatewayType Vpn ` -VpnType RouteBased ` -EnableBgp $false ` -GatewaySku Standard ` -VpnClientAddressPool $vpnClientAddressPool ` -VpnClientRootCertificates $p2sRootCert
Varning
Det kan ta flera minuter för den här processen att slutföras.
Använd följande kod för att skapa Azure Storage-kontot och blobcontainern:
# Create the storage account New-AzStorageAccount ` -ResourceGroupName $resourceGroupName ` -Name $storageName ` -SkuName Standard_GRS ` -Location $location ` -Kind StorageV2 ` -EnableHttpsTrafficOnly 1 # Get the storage account keys and create a context $defaultStorageKey = (Get-AzStorageAccountKey -ResourceGroupName $resourceGroupName ` -Name $storageName)[0].Value $storageContext = New-AzStorageContext -StorageAccountName $storageName ` -StorageAccountKey $defaultStorageKey # Create the default storage container New-AzStorageContainer -Name $defaultContainerName ` -Context $storageContext
Använd följande kod för att skapa HDInsight-klustret:
# Create the HDInsight cluster New-AzHDInsightCluster ` -ResourceGroupName $resourceGroupName ` -ClusterName $clusterName ` -Location $location ` -ClusterSizeInNodes $hdiWorkerNodes ` -ClusterType $hdiType ` -OSType Linux ` -Version $hdiVersion ` -HttpCredential $adminCreds ` -SshCredential $sshCreds ` -DefaultStorageAccountName "$storageName.blob.core.windows.net" ` -DefaultStorageAccountKey $defaultStorageKey ` -DefaultStorageContainer $defaultContainerName ` -DisksPerWorkerNode 2 ` -VirtualNetworkId $network.Id ` -SubnetName $defaultSubnet.Id
Varning
Den här processen tar cirka 15 minuter att slutföra.
Konfigurera Kafka för IP-reklam
Som standard returnerar Apache Zookeeper domännamnet för Kafka-koordinatorerna till klienter. Den här konfigurationen fungerar inte med VPN-programvaruklienten eftersom den inte kan använda namnmatchning för entiteter i det virtuella nätverket. För den här konfigurationen använder du följande steg för att konfigurera Kafka att annonsera IP-adresser i stället för domännamn:
Via en webbläsare går du till
https://CLUSTERNAME.azurehdinsight.net
. ErsättCLUSTERNAME
med namnet på Kafka i HDInsight-klustret.När du uppmanas till det använder du HTTPS-användarnamnet och lösenordet för klustret. Ambari-webbgränssnittet för klustret visas.
Om du vill visa information om Kafka väljer du Kafka i listan till vänster.
Om du vill visa Kafka-konfigurationen väljer du Konfigurationer längst upp i mitten.
Om du vill hitta kafka-env-konfigurationen anger du
kafka-env
i fältet Filter längst upp till höger.Om du vill konfigurera Kafka för att annonsera IP-adresser lägger du till följande text längst ned i fältet kafka-env-template :
# Configure Kafka to advertise IP addresses instead of FQDN IP_ADDRESS=$(hostname -i) echo advertised.listeners=$IP_ADDRESS sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
Om du vill konfigurera det gränssnitt som Kafka lyssnar på anger du
listeners
i fältet Filter längst upp till höger.Om du vill konfigurera Kafka att lyssna på alla nätverksgränssnitt ändrar du värdet i lyssnarfältet till
PLAINTEXT://0.0.0.0:9092
.Spara konfigurationsändringarna genom att använda knappen Spara . Ange ett textmeddelande som beskriver ändringarna. Välj OK när ändringarna har sparats.
Om du vill förhindra fel vid omstart av Kafka använder du knappen Tjänståtgärder och väljer Aktivera underhållsläge. Välj OK för att slutföra den här åtgärden.
Om du vill starta om Kafka använder du knappen Starta om och väljer Starta om alla berörda. Bekräfta omstarten och använd sedan ok-knappen när åtgärden har slutförts.
Om du vill inaktivera underhållsläget använder du knappen Service Actions (Tjänståtgärder) och väljer Inaktivera underhållsläge. Välj OK för att slutföra den här åtgärden.
Ansluta till VPN-gatewayen
Om du vill ansluta till VPN-gatewayen använder du avsnittet Anslut till Azure i dokumentet Konfigurera en punkt-till-plats-anslutning .
Exempel: Python-klient
Om du vill verifiera anslutningen till Kafka använder du följande steg för att skapa och köra en Python-producent och -konsument:
Använd någon av följande metoder för att hämta det fullständigt kvalificerade domännamnet (FQDN) och IP-adresserna för noderna i Kafka-klustret:
$resourceGroupName = "The resource group that contains the virtual network used with HDInsight" $clusterNICs = Get-AzNetworkInterface -ResourceGroupName $resourceGroupName | where-object {$_.Name -like "*node*"} $nodes = @() foreach($nic in $clusterNICs) { $node = new-object System.Object $node | add-member -MemberType NoteProperty -name "Type" -value $nic.Name.Split('-')[1] $node | add-member -MemberType NoteProperty -name "InternalIP" -value $nic.IpConfigurations.PrivateIpAddress $node | add-member -MemberType NoteProperty -name "InternalFQDN" -value $nic.DnsSettings.InternalFqdn $nodes += $node } $nodes | sort-object Type
az network nic list --resource-group <resourcegroupname> --output table --query "[?contains(name,'node')].{NICname:name,InternalIP:ipConfigurations[0].privateIpAddress,InternalFQDN:dnsSettings.internalFqdn}"
Det här skriptet förutsätter att
$resourceGroupName
är namnet på den Azure-resursgrupp som innehåller det virtuella nätverket.Spara den returnerade informationen för användning i nästa steg.
Använd följande för att installera kafka-python-klienten :
pip install kafka-python
Om du vill skicka data till Kafka använder du följande Python-kod:
from kafka import KafkaProducer # Replace the `ip_address` entries with the IP address of your worker nodes # NOTE: you don't need the full list of worker nodes, just one or two. producer = KafkaProducer(bootstrap_servers=['kafka_broker_1','kafka_broker_2']) for _ in range(50): producer.send('testtopic', b'test message')
Ersätt posterna
'kafka_broker'
med de adresser som returneras från steg 1 i det här avsnittet:Om du använder en VPN-klient för programvara ersätter du posterna
kafka_broker
med IP-adressen för dina arbetsnoder.Om du har aktiverat namnmatchning via en anpassad DNS-server ersätter du posterna
kafka_broker
med FQDN för arbetsnoderna.Kommentar
Den här koden skickar strängen
test message
till ämnettesttopic
. Standardkonfigurationen för Kafka i HDInsight är inte att skapa ämnet om det inte finns. Se Så här konfigurerar du Apache Kafka på HDInsight för att skapa ämnen automatiskt. Du kan också skapa ämnen manuellt innan du skapar meddelanden.
Om du vill hämta meddelandena från Kafka använder du följande Python-kod:
from kafka import KafkaConsumer # Replace the `ip_address` entries with the IP address of your worker nodes # Again, you only need one or two, not the full list. # Note: auto_offset_reset='earliest' resets the starting offset to the beginning # of the topic consumer = KafkaConsumer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'],auto_offset_reset='earliest') consumer.subscribe(['testtopic']) for msg in consumer: print (msg)
Ersätt posterna
'kafka_broker'
med de adresser som returneras från steg 1 i det här avsnittet:Om du använder en VPN-klient för programvara ersätter du posterna
kafka_broker
med IP-adressen för dina arbetsnoder.Om du har aktiverat namnmatchning via en anpassad DNS-server ersätter du posterna
kafka_broker
med FQDN för arbetsnoderna.
Nästa steg
Mer information om hur du använder HDInsight med ett virtuellt nätverk finns i dokumentet Planera en distribution av virtuella nätverk för Azure HDInsight-kluster .
Mer information om hur du skapar ett virtuellt Azure-nätverk med punkt-till-plats-VPN-gateway finns i följande dokument:
Konfigurera en punkt-till-plats-anslutning med hjälp av Azure Portal
Konfigurera en punkt-till-plats-anslutning med Azure PowerShell
Mer information om hur du arbetar med Apache Kafka i HDInsight finns i följande dokument: