// Modified from the book _Java in a Nutshell_ by David Flanagan. // Portions Copyright (c) 1996 O'Reilly & Associates // Portions Copyright (c) 1996 Pencom Systems Inc. // Provided WITHOUT WARRANTY either expressed or implied. import java.net.*; import java.io.*; import java.awt.*; import java.util.*; import StreamPump; import Message; import ChatCodes; public class Server extends Thread { public final static int DEFAULT_PORT = 6789; protected int port; protected ServerSocket listen_socket; protected ThreadGroup threadgroup; protected List connection_list; protected Vector connections; protected Vulture vulture; protected Hub hub; // Exit with an error message, when an exception occurs. public static void fail(Exception e, String msg) { System.err.println(msg + ": " + e); System.exit(1); } // Create a ServerSocket to listen for connections on; start the thread. public Server(int port) { // Create our server thread with a name. super("Server"); if (port == 0) port = DEFAULT_PORT; this.port = port; try { listen_socket = new ServerSocket(port); } catch (IOException e) fail(e, "Exception creating server socket"); // Create a threadgroup for our connections threadgroup = new ThreadGroup("Server Connections"); // Create a window to display our connections in Frame f = new Frame("Server Status"); connection_list = new List(); f.add("Center", connection_list); f.resize(400, 200); f.show(); // Initialize a vector to store our connections in connections = new Vector(); // Create a Vulture thread to wait for other threads to die. // It starts itself automatically. vulture = new Vulture(this); // Create a Hub for communication between Connections hub = new Hub(this); // Start the server listening for connections this.start(); } // The body of the server thread. Loop forever, listening for and // accepting connections from clients. For each connection, // create a Connection object to handle communication through the // new Socket. When we create a new connection, add it to the // Vector of connections, and display it in the List. Note that we // use synchronized to lock the Vector of connections. The Vulture // class does the same, so the vulture won't be removing dead // connections while we're adding fresh ones. public void run() { try { while(true) { Socket client_socket = listen_socket.accept(); Connection c = new Connection(client_socket, threadgroup, 3, vulture, hub); // prevent simultaneous access. synchronized (connections) { connections.addElement(c); connection_list.addItem(c.toString()); } } } catch (IOException e) fail(e, "Exception while listening for connections"); } // Start the server up, listening on an optionally specified port public static void main(String[] args) { int port = 0; if (args.length == 1) { try port = Integer.parseInt(args[0]); catch (NumberFormatException e) port = 0; } new Server(port); } } // This class is the thread that handles all communication with a client // It also notifies the Vulture when the connection is dropped. class Connection extends Thread { static int connection_number = 0; static String WELCOME_MESSAGE = "0|Server|Welcome to the LittleChat Server."; protected Socket client; protected Vulture vulture; // Streams from and to the network. protected DataInputStream in; protected PrintStream out; // The hub to facilitate communication between Connections protected Hub hub; // The pipe to listen to the hub // (note: each Connection has its own pipe) protected BufferedInputStream bufIn; protected PipedInputStream pipeIn; protected PipedOutputStream pipeOut; protected DataInputStream fromHub; protected PrintStream toConn; protected Courier courierToHub; protected StreamPump pumpToClient; String userName = ""; // Initialize the streams and start the thread public Connection(Socket client_socket, ThreadGroup threadgroup, int priority, Vulture vulture, Hub hub) { // Give the thread a group, a name, and a priority. super(threadgroup, "Connection-" + connection_number++); this.setPriority(priority); // Save our other arguments away client = client_socket; this.vulture = vulture; this.hub = hub; // Create the streams try { // network socket streams in = new DataInputStream(client.getInputStream()); out = new PrintStream(client.getOutputStream()); // pipe from the Hub pipeOut = new PipedOutputStream(); pipeIn = new PipedInputStream(pipeOut); fromHub = new DataInputStream(pipeIn); toConn = new PrintStream(pipeOut); } catch (IOException e) { disconnect(); return; } // And start the thread up this.start(); } // Return a PrintStream to talk to this connection public PrintStream getPrintStream() { return toConn; } // Close the connection to this client synchronized void disconnect() { try { client.close(); } catch (IOException e) ; finally { synchronized (vulture) vulture.notify(); courierToHub = null; pumpToClient = null; } } // Provide the service. public void run() { String line; int len; try { out.println(WELCOME_MESSAGE); courierToHub = new Courier(in, this); pumpToClient = new StreamPump(fromHub, out, this); this.suspend(); // let the pumps do their work } catch (Exception e) ; // When we're done, for whatever reason, be sure to close // the socket, and to notify the Vulture object. Note that // we have to use synchronized first to lock the vulture // object before we can call notify() for it. finally { disconnect(); } } // This method returns the string representation of the Connection. // This is the string that will appear in the GUI List. public String toString() { return this.getName() + " connected to: " + client.getInetAddress().getHostName() + ":" + client.getPort(); } public synchronized String getUserName() { return userName; } public synchronized void setUserName(String name) { userName = name; } } // This class waits to be notified that a thread is dying (exiting) // and then cleans up the list of threads and the graphical list. class Vulture extends Thread { protected Server server; protected Vulture(Server s) { super(s.threadgroup, "Connection Vulture"); server = s; this.start(); } // This is the method that waits for notification of exiting threads // and cleans up the lists. It is a synchronized method, so it // acquires a lock on the `this' object before running. This is // necessary so that it can call wait() on this. Even if the // the Connection objects never call notify(), this method wakes up // every five seconds and checks all the connections, just in case. // Note also that all access to the Vector of connections and to // the GUI List component are within a synchronizl notify(), this method wakes up // every five seconds and checks all the connections, just in case. // Note also that all access to the Vector of connections and to // the GUI List component are within a synchronized block as well. // This prevents the Server class from adding a new conenction while // we're removing an old one. public synchronized void run() { for(;;) { try this.wait(5000); catch (InterruptedException e) ; // prevent simultaneous access synchronized(server.connections) { // loop through the connections for(int i = 0; i < server.connections.size(); i++) { Connection c; c = (Connection)server.connections.elementAt(i); // if the connection thread isn't alive anymore, // remove it from the Vector and List. if (!c.isAlive()) { server.connections.removeElementAt(i); server.connection_list.delItem(i); i--; } } } } } } class Hub extends Thread implements ChatCodes { protected Server server; // the Server who spawned this Hub protected Vector messages; protected Vector userNames; public Hub(Server s) { super(s.threadgroup, "Chat Server Hub"); server = s; messages = new Vector(); userNames = new Vector(); // Start up the thread. this.start(); } public synchronized void addMessage(Message msg) { messages.addElement(msg); this.resume(); // let the hub know there's a message } public synchronized Message popMessageQueue() { Message msg = (Message) messages.elementAt(0); messages.removeElementAt(0); return msg; } public synchronized String allUsers() { StringBuffer names = new StringBuffer(); for (int i=0; i < userNames.size(); i++) { names.append((String) userNames.elementAt(i)); names.append("|"); } return names.toString(); } String processMessage(Message msg) { // Do any necessary processing required by the message code // Return a string which is to be broadcast to all clients // in response to this message (null if no further action required) switch (msg.getCode()) { case _NORMAL_MESSAGE: { return msg.toString(); } case _CLOSE_CONNECTION: { // close the Connection Connection sender = (Connection) msg.getSender(); // resume the Connection thread // (it was only waiting to disconnect) synchronized (sender) sender.resume(); while (userNames.contains(msg.getUser())) { userNames.removeElement(msg.getUser()); } // Tell everyone the news... return (new Message(_REMOVE_USER, sender.getUserName(), sender.getUserName(), this)).toString(); } case _OPEN_CONNECTION: { // not needed? return null; } case _SET_USER_NAME: { // sent at user login Connection sender = (Connection) msg.getSender(); sender.setUserName(msg.getUser()); userNames.addElement(msg.getUser()); return (new Message(_ADD_NEW_USER, msg)).toString(); } case _REQUEST_USER_LIST: { String names = allUsers(); Message outgoing = new Message(_USER_LIST, "Server", names, this); return outgoing.toString(); } default: { return msg.toString(); } } } public void run() { Message msg; String line = null; Connection oneConnection; PrintStream ps; // FOR DEBUGGING System.out.println("Hub thread started..."); while (true) { // block until there is a message while (messages.size() == 0) { this.suspend(); // ... wait for someone to add a message } // GET THE MESSAGE msg = popMessageQueue(); // FOR DEBUGGING System.out.println("Hub got a message: "); System.out.println("\tcode = " + msg.getCode()); System.out.println("\tuser = " + msg.getUser()); System.out.println("\ttext = " + msg.getText()); // PROCESS THE MESSAGE line = processMessage(msg); // BROADCAST THE MESSAGE... if (line != null) { synchronized (server.connections) { for (Enumeration e = server.connections.elements(); e.hasMoreElements(); ) { oneConnection = (Connection) e.nextElement(); ps = oneConnection.getPrintStream(); ps.println(line); // FOR DEBUGGING System.out.println("Hub is Sending: " + line); } } } } } } /** * The Courier gets messages and adds them to the * hub's message queue. */ class Courier extends Thread { DataInputStream in; PrintStream out; Connection master; public Courier(InputStream iStream, Connection m) { super("Courier for " + m.getName()); master = m; if (iStream instanceof DataInputStream) { in = (DataInputStream) iStream; } else { in = new DataInputStream(iStream); } start(); } public void run() { String line; Message msg; try { while(true) { line = in.readLine(); if (line == null) { throw new IOException(this.getName() + " received null."); } // FOR DEBUGGING System.out.println(this.getName() + " received: " + line); System.out.println("Converting String to message..."); msg = new Message(line, master); // FOR DEBUGGING System.out.println("Placing on message queue..."); master.hub.addMessage(msg); } } catch (IOException e) { // Couldn't read from input. Give up. synchronized (master) { master.resume(); } } } }